from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.l2 import Ether, ARP
from scapy.data import IP_PROTOS
+from scapy.packet import bind_layers
from util import ppp
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
from time import sleep
cls.icmp_id_in = 6305
cls.icmp_id_out = 6305
cls.snat_addr = '10.0.0.3'
+ cls.ipfix_src_port = 4739
+ cls.ipfix_domain_id = 1
- cls.create_pg_interfaces(range(8))
+ cls.create_pg_interfaces(range(9))
cls.interfaces = list(cls.pg_interfaces[0:4])
for i in cls.interfaces:
i.config_ip4()
i.resolve_arp()
- cls.pg0.generate_remote_hosts(2)
+ cls.pg0.generate_remote_hosts(3)
cls.pg0.configure_ipv4_neighbors()
cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
i.resolve_arp()
cls.pg7.admin_up()
+ cls.pg8.admin_up()
except Exception:
super(TestSNAT, cls).tearDownClass()
"""
Clear SNAT configuration.
"""
+ # I found no elegant way to do this
+ self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+ dst_address_length=32,
+ next_hop_address=self.pg7.remote_ip4n,
+ next_hop_sw_if_index=self.pg7.sw_if_index,
+ is_add=0)
+ self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+ dst_address_length=32,
+ next_hop_address=self.pg8.remote_ip4n,
+ next_hop_sw_if_index=self.pg8.sw_if_index,
+ is_add=0)
+
+ for intf in [self.pg7, self.pg8]:
+ neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
+ for n in neighbors:
+ self.vapi.ip_neighbor_add_del(intf.sw_if_index,
+ n.mac_address,
+ n.ip_address,
+ is_add=0)
+
if self.pg7.has_ip4_config:
self.pg7.unconfig_ip4()
for intf in interfaces:
self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
- self.vapi.snat_ipfix(enable=0)
+ self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
+ domain_id=self.ipfix_domain_id)
+ self.ipfix_src_port = 4739
+ self.ipfix_domain_id = 1
interfaces = self.vapi.snat_interface_dump()
for intf in interfaces:
self.icmp_id_in])
def test_hairpinning(self):
- """ SNAT hairpinning """
+ """ SNAT hairpinning - 1:1 NAT with port"""
host = self.pg0.remote_hosts[0]
server = self.pg0.remote_hosts[1]
self.logger.error(ppp("Unexpected or invalid packet:"), p)
raise
+ def test_hairpinning2(self):
+ """ SNAT hairpinning - 1:1 NAT"""
+
+ server1_nat_ip = "10.0.0.10"
+ server2_nat_ip = "10.0.0.11"
+ host = self.pg0.remote_hosts[0]
+ server1 = self.pg0.remote_hosts[1]
+ server2 = self.pg0.remote_hosts[2]
+ server_tcp_port = 22
+ server_udp_port = 20
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # add static mapping for servers
+ self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
+ self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
+
+ # host to server1
+ pkts = []
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=host.ip4, dst=server1_nat_ip) /
+ TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=host.ip4, dst=server1_nat_ip) /
+ UDP(sport=self.udp_port_in, dport=server_udp_port))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=host.ip4, dst=server1_nat_ip) /
+ ICMP(id=self.icmp_id_in, type='echo-request'))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, self.snat_addr)
+ self.assertEqual(packet[IP].dst, server1.ip4)
+ if packet.haslayer(TCP):
+ self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
+ self.assertEqual(packet[TCP].dport, server_tcp_port)
+ self.tcp_port_out = packet[TCP].sport
+ elif packet.haslayer(UDP):
+ self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
+ self.assertEqual(packet[UDP].dport, server_udp_port)
+ self.udp_port_out = packet[UDP].sport
+ else:
+ self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
+ self.icmp_id_out = packet[ICMP].id
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # server1 to host
+ pkts = []
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=self.snat_addr) /
+ TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=self.snat_addr) /
+ UDP(sport=server_udp_port, dport=self.udp_port_out))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=self.snat_addr) /
+ ICMP(id=self.icmp_id_out, type='echo-reply'))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, server1_nat_ip)
+ self.assertEqual(packet[IP].dst, host.ip4)
+ if packet.haslayer(TCP):
+ self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+ self.assertEqual(packet[TCP].sport, server_tcp_port)
+ elif packet.haslayer(UDP):
+ self.assertEqual(packet[UDP].dport, self.udp_port_in)
+ self.assertEqual(packet[UDP].sport, server_udp_port)
+ else:
+ self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # server2 to server1
+ pkts = []
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server2.ip4, dst=server1_nat_ip) /
+ TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server2.ip4, dst=server1_nat_ip) /
+ UDP(sport=self.udp_port_in, dport=server_udp_port))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server2.ip4, dst=server1_nat_ip) /
+ ICMP(id=self.icmp_id_in, type='echo-request'))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, server2_nat_ip)
+ self.assertEqual(packet[IP].dst, server1.ip4)
+ if packet.haslayer(TCP):
+ self.assertEqual(packet[TCP].sport, self.tcp_port_in)
+ self.assertEqual(packet[TCP].dport, server_tcp_port)
+ self.tcp_port_out = packet[TCP].sport
+ elif packet.haslayer(UDP):
+ self.assertEqual(packet[UDP].sport, self.udp_port_in)
+ self.assertEqual(packet[UDP].dport, server_udp_port)
+ self.udp_port_out = packet[UDP].sport
+ else:
+ self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ self.icmp_id_out = packet[ICMP].id
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # server1 to server2
+ pkts = []
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=server2_nat_ip) /
+ TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=server2_nat_ip) /
+ UDP(sport=server_udp_port, dport=self.udp_port_out))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=server2_nat_ip) /
+ ICMP(id=self.icmp_id_out, type='echo-reply'))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, server1_nat_ip)
+ self.assertEqual(packet[IP].dst, server2.ip4)
+ if packet.haslayer(TCP):
+ self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+ self.assertEqual(packet[TCP].sport, server_tcp_port)
+ elif packet.haslayer(UDP):
+ self.assertEqual(packet[UDP].dport, self.udp_port_in)
+ self.assertEqual(packet[UDP].sport, server_udp_port)
+ else:
+ self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
def test_max_translations_per_user(self):
""" MAX translations per user - recycle the least recently used """
def test_ipfix_nat44_sess(self):
""" S-NAT IPFIX logging NAT44 session created/delted """
+ self.ipfix_domain_id = 10
+ self.ipfix_src_port = 20202
+ colector_port = 30303
+ bind_layers(UDP, IPFIX, dport=30303)
self.snat_add_address(self.snat_addr)
self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
src_address=self.pg3.local_ip4n,
path_mtu=512,
- template_interval=10)
- self.vapi.snat_ipfix()
+ template_interval=10,
+ collector_port=colector_port)
+ self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
+ src_port=self.ipfix_src_port)
pkts = self.create_stream_in(self.pg0, self.pg1)
self.pg0.add_stream(pkts)
# first load template
for p in capture:
self.assertTrue(p.haslayer(IPFIX))
+ self.assertEqual(p[IP].src, self.pg3.local_ip4)
+ self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+ self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+ self.assertEqual(p[UDP].dport, colector_port)
+ self.assertEqual(p[IPFIX].observationDomainID,
+ self.ipfix_domain_id)
if p.haslayer(Template):
ipfix.add_template(p.getlayer(Template))
# verify events in data set
src_address=self.pg3.local_ip4n,
path_mtu=512,
template_interval=10)
- self.vapi.snat_ipfix()
+ self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
+ src_port=self.ipfix_src_port)
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
# first load template
for p in capture:
self.assertTrue(p.haslayer(IPFIX))
+ self.assertEqual(p[IP].src, self.pg3.local_ip4)
+ self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+ self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+ self.assertEqual(p[UDP].dport, 4739)
+ self.assertEqual(p[IPFIX].observationDomainID,
+ self.ipfix_domain_id)
if p.haslayer(Template):
ipfix.add_template(p.getlayer(Template))
# verify events in data set
capture = self.pg2.get_capture(len(pkts))
self.verify_capture_out(capture, nat_ip1)
+ def test_dynamic_ipless_interfaces(self):
+ """ SNAT interfaces without configured ip dynamic map """
+
+ self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
+ self.pg7.remote_mac,
+ self.pg7.remote_ip4n,
+ is_static=1)
+ self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
+ self.pg8.remote_mac,
+ self.pg8.remote_ip4n,
+ is_static=1)
+
+ self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+ dst_address_length=32,
+ next_hop_address=self.pg7.remote_ip4n,
+ next_hop_sw_if_index=self.pg7.sw_if_index)
+ self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+ dst_address_length=32,
+ next_hop_address=self.pg8.remote_ip4n,
+ next_hop_sw_if_index=self.pg8.sw_if_index)
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
+ is_inside=0)
+
+ # in2out
+ pkts = self.create_stream_in(self.pg7, self.pg8)
+ self.pg7.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg8.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg8, self.snat_addr)
+ self.pg8.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg7.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg7)
+
+ def test_static_ipless_interfaces(self):
+ """ SNAT 1:1 NAT interfaces without configured ip """
+
+ self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
+ self.pg7.remote_mac,
+ self.pg7.remote_ip4n,
+ is_static=1)
+ self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
+ self.pg8.remote_mac,
+ self.pg8.remote_ip4n,
+ is_static=1)
+
+ self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+ dst_address_length=32,
+ next_hop_address=self.pg7.remote_ip4n,
+ next_hop_sw_if_index=self.pg7.sw_if_index)
+ self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+ dst_address_length=32,
+ next_hop_address=self.pg8.remote_ip4n,
+ next_hop_sw_if_index=self.pg8.sw_if_index)
+
+ self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
+ is_inside=0)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg8)
+ self.pg8.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg7.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg7)
+
+ # in2out
+ pkts = self.create_stream_in(self.pg7, self.pg8)
+ self.pg7.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg8.get_capture(len(pkts))
+ self.verify_capture_out(capture, self.snat_addr, True)
+
+ def test_static_with_port_ipless_interfaces(self):
+ """ SNAT 1:1 NAT with port interfaces without configured ip """
+
+ self.tcp_port_out = 30606
+ self.udp_port_out = 30607
+ self.icmp_id_out = 30608
+
+ self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
+ self.pg7.remote_mac,
+ self.pg7.remote_ip4n,
+ is_static=1)
+ self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
+ self.pg8.remote_mac,
+ self.pg8.remote_ip4n,
+ is_static=1)
+
+ self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
+ dst_address_length=32,
+ next_hop_address=self.pg7.remote_ip4n,
+ next_hop_sw_if_index=self.pg7.sw_if_index)
+ self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
+ dst_address_length=32,
+ next_hop_address=self.pg8.remote_ip4n,
+ next_hop_sw_if_index=self.pg8.sw_if_index)
+
+ self.snat_add_address(self.snat_addr)
+ self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
+ self.tcp_port_in, self.tcp_port_out,
+ proto=IP_PROTOS.tcp)
+ self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
+ self.udp_port_in, self.udp_port_out,
+ proto=IP_PROTOS.udp)
+ self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
+ self.icmp_id_in, self.icmp_id_out,
+ proto=IP_PROTOS.icmp)
+ self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
+ is_inside=0)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg8)
+ self.pg8.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg7.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg7)
+
+ # in2out
+ pkts = self.create_stream_in(self.pg7, self.pg8)
+ self.pg7.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg8.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+
def tearDown(self):
super(TestSNAT, self).tearDown()
if not self.vpp_dead:
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- UDP(sport=3000, dport=3000))
+ UDP(sport=3001, dport=3002))
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
capture = self.pg1.assert_nothing_captured()
+ # verify ICMP error packet
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ self.assertTrue(p.haslayer(ICMP))
+ icmp = p[ICMP]
+ self.assertEqual(icmp.type, 3)
+ self.assertEqual(icmp.code, 1)
+ self.assertTrue(icmp.haslayer(IPerror))
+ inner_ip = icmp[IPerror]
+ self.assertEqual(inner_ip[UDPerror].sport, 3001)
+ self.assertEqual(inner_ip[UDPerror].dport, 3002)
+
dms = self.vapi.snat_det_map_dump()
self.assertEqual(1000, dms[0].ses_num)