import socket
import unittest
-from logging import *
from framework import VppTestCase, VppTestRunner
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.l2 import Ether
+from util import ppp
class TestSNAT(VppTestCase):
i.config_ip4()
i.resolve_arp()
+ cls.pg0.generate_remote_hosts(2)
+ cls.pg0.configure_ipv4_neighbors()
+
cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
for i in cls.overlapping_interfaces:
:param dst_ip: Destination IP address (Default use global SNAT address)
"""
if dst_ip is None:
- dst_ip=self.snat_addr
+ dst_ip = self.snat_addr
pkts = []
# TCP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
self.icmp_id_out = packet[ICMP].id
except:
- error("Unexpected or invalid packet (outside network):")
- error(packet.show())
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(outside network):", packet))
raise
def verify_capture_in(self, capture, in_if, packet_num=3):
else:
self.assertEqual(packet[ICMP].id, self.icmp_id_in)
except:
- error("Unexpected or invalid packet (inside network):")
- error(packet.show())
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(inside network):", packet))
raise
def clear_snat(self):
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
def test_static_in(self):
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture, nat_ip, True)
# out2in
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
def test_static_out(self):
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
# in2out
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture, nat_ip, True)
def test_static_with_port_in(self):
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
def test_static_with_port_out(self):
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
# in2out
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture)
def test_static_vrf_aware(self):
self.pg4.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture, nat_ip1, True)
# inside interface VRF don't match SNAT static mapping VRF (packets
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
- self.verify_capture_out(capture, packet_num=0)
+ self.pg3.assert_nothing_captured()
def test_multiple_inside_interfaces(self):
- """ SNAT multiple inside interfaces with non-overlapping address space """
+ """SNAT multiple inside interfaces with non-overlapping address space"""
self.snat_add_address(self.snat_addr)
self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 1st interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
# in2out 2nd interface
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 2nd interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg1)
# in2out 3rd interface
self.pg2.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 3rd interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg2.get_capture()
+ capture = self.pg2.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg2)
def test_inside_overlapping_interfaces(self):
self.pg4.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 1st interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg4.get_capture()
+ capture = self.pg4.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg4)
# in2out 2nd interface
self.pg5.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 2nd interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg5.get_capture()
+ capture = self.pg5.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg5)
# in2out 3rd interface
self.pg6.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 3rd interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg6.get_capture()
+ capture = self.pg6.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg6)
+ def test_hairpinning(self):
+ """ SNAT hairpinning """
+
+ host = self.pg0.remote_hosts[0]
+ server = self.pg0.remote_hosts[1]
+ host_in_port = 1234
+ host_out_port = 0
+ server_in_port = 5678
+ server_out_port = 8765
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ # add static mapping for server
+ self.snat_add_static_mapping(server.ip4, self.snat_addr,
+ server_in_port, server_out_port)
+
+ # send packet from host to server
+ p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
+ IP(src=host.ip4, dst=self.snat_addr) /
+ TCP(sport=host_in_port, dport=server_out_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.snat_addr)
+ self.assertEqual(ip.dst, server.ip4)
+ self.assertNotEqual(tcp.sport, host_in_port)
+ self.assertEqual(tcp.dport, server_in_port)
+ host_out_port = tcp.sport
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # send reply from server to host
+ p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
+ IP(src=server.ip4, dst=self.snat_addr) /
+ TCP(sport=server_in_port, dport=host_out_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.snat_addr)
+ self.assertEqual(ip.dst, host.ip4)
+ self.assertEqual(tcp.sport, server_out_port)
+ self.assertEqual(tcp.dport, host_in_port)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:"), p)
+ raise
+
+ def test_max_translations_per_user(self):
+ """ MAX translations per user - recycle the least recently used """
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # get maximum number of translations per user
+ snat_config = self.vapi.snat_show_config()
+
+ # send more than maximum number of translations per user packets
+ pkts_num = snat_config.max_translations_per_user + 5
+ pkts = []
+ for port in range(0, pkts_num):
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=1025 + port))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # verify number of translated packet
+ self.pg1.get_capture(pkts_num)
+
def tearDown(self):
super(TestSNAT, self).tearDown()
if not self.vpp_dead: