import socket
import unittest
-from logging import *
from framework import VppTestCase, VppTestRunner
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.l2 import Ether
+from util import ppp
class TestSNAT(VppTestCase):
i.config_ip4()
i.resolve_arp()
+ cls.pg0.generate_remote_hosts(2)
+ cls.pg0.configure_ipv4_neighbors()
+
cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
for i in cls.overlapping_interfaces:
:param dst_ip: Destination IP address (Default use global SNAT address)
"""
if dst_ip is None:
- dst_ip=self.snat_addr
+ dst_ip = self.snat_addr
pkts = []
# TCP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
self.icmp_id_out = packet[ICMP].id
except:
- error("Unexpected or invalid packet (outside network):")
- error(packet.show())
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(outside network):", packet))
raise
def verify_capture_in(self, capture, in_if, packet_num=3):
else:
self.assertEqual(packet[ICMP].id, self.icmp_id_in)
except:
- error("Unexpected or invalid packet (inside network):")
- error(packet.show())
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(inside network):", packet))
raise
def clear_snat(self):
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
- self.verify_capture_out(capture, packet_num=0)
+ self.pg3.assert_nothing_captured()
def test_multiple_inside_interfaces(self):
- """ SNAT multiple inside interfaces with non-overlapping address space """
+ """SNAT multiple inside interfaces with non-overlapping address space"""
self.snat_add_address(self.snat_addr)
self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
capture = self.pg6.get_capture()
self.verify_capture_in(capture, self.pg6)
+ def test_hairpinning(self):
+ """ SNAT hairpinning """
+
+ host = self.pg0.remote_hosts[0]
+ server = self.pg0.remote_hosts[1]
+ host_in_port = 1234
+ host_out_port = 0
+ server_in_port = 5678
+ server_out_port = 8765
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ # add static mapping for server
+ self.snat_add_static_mapping(server.ip4, self.snat_addr,
+ server_in_port, server_out_port)
+
+ # send packet from host to server
+ p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
+ IP(src=host.ip4, dst=self.snat_addr) /
+ TCP(sport=host_in_port, dport=server_out_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture()
+ self.assertEqual(1, len(capture))
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.snat_addr)
+ self.assertEqual(ip.dst, server.ip4)
+ self.assertNotEqual(tcp.sport, host_in_port)
+ self.assertEqual(tcp.dport, server_in_port)
+ host_out_port = tcp.sport
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # send reply from server to host
+ p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
+ IP(src=server.ip4, dst=self.snat_addr) /
+ TCP(sport=server_in_port, dport=host_out_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture()
+ self.assertEqual(1, len(capture))
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.snat_addr)
+ self.assertEqual(ip.dst, host.ip4)
+ self.assertEqual(tcp.sport, server_out_port)
+ self.assertEqual(tcp.dport, host_in_port)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:"), p)
+ raise
+
def tearDown(self):
super(TestSNAT, self).tearDown()
if not self.vpp_dead: