cls.icmp_id_out = 6305
cls.snat_addr = '10.0.0.3'
- cls.create_pg_interfaces(range(7))
+ cls.create_pg_interfaces(range(8))
cls.interfaces = list(cls.pg_interfaces[0:4])
for i in cls.interfaces:
i.admin_up()
i.resolve_arp()
+ cls.pg7.admin_up()
+
except Exception:
super(TestSNAT, cls).tearDownClass()
raise
if same_port:
self.assertEqual(packet[TCP].sport, self.tcp_port_in)
else:
- self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
+ self.assertNotEqual(
+ packet[TCP].sport, self.tcp_port_in)
self.tcp_port_out = packet[TCP].sport
elif packet.haslayer(UDP):
if same_port:
self.assertEqual(packet[UDP].sport, self.udp_port_in)
else:
- self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
+ self.assertNotEqual(
+ packet[UDP].sport, self.udp_port_in)
self.udp_port_out = packet[UDP].sport
else:
if same_port:
"""
Clear SNAT configuration.
"""
+ interfaces = self.vapi.snat_interface_addr_dump()
+ for intf in interfaces:
+ self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
+
interfaces = self.vapi.snat_interface_dump()
for intf in interfaces:
self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
addr_only = 0
l_ip = socket.inet_pton(socket.AF_INET, local_ip)
e_ip = socket.inet_pton(socket.AF_INET, external_ip)
- self.vapi.snat_add_static_mapping(l_ip, e_ip, local_port, external_port,
- addr_only, vrf_id, is_add)
+ self.vapi.snat_add_static_mapping(
+ l_ip,
+ e_ip,
+ local_port,
+ external_port,
+ addr_only,
+ vrf_id,
+ is_add)
def snat_add_address(self, ip, is_add=1):
"""
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
def test_static_in(self):
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture, nat_ip, True)
# out2in
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
def test_static_out(self):
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
# in2out
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture, nat_ip, True)
def test_static_with_port_in(self):
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
def test_static_with_port_out(self):
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
# in2out
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture)
def test_static_vrf_aware(self):
self.pg4.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture, nat_ip1, True)
# inside interface VRF don't match SNAT static mapping VRF (packets
self.pg3.assert_nothing_captured()
def test_multiple_inside_interfaces(self):
- """SNAT multiple inside interfaces with non-overlapping address space"""
+ """
+ SNAT multiple inside interfaces with non-overlapping address space
+ """
self.snat_add_address(self.snat_addr)
self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 1st interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
+ capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
# in2out 2nd interface
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 2nd interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture()
+ capture = self.pg1.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg1)
# in2out 3rd interface
self.pg2.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 3rd interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg2.get_capture()
+ capture = self.pg2.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg2)
def test_inside_overlapping_interfaces(self):
self.pg4.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 1st interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg4.get_capture()
+ capture = self.pg4.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg4)
# in2out 2nd interface
self.pg5.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 2nd interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg5.get_capture()
+ capture = self.pg5.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg5)
# in2out 3rd interface
self.pg6.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
+ capture = self.pg3.get_capture(len(pkts))
self.verify_capture_out(capture)
# out2in 3rd interface
self.pg3.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg6.get_capture()
+ capture = self.pg6.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg6)
def test_hairpinning(self):
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
- self.assertEqual(1, len(capture))
+ capture = self.pg0.get_capture(1)
p = capture[0]
try:
ip = p[IP]
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg0.get_capture()
- self.assertEqual(1, len(capture))
+ capture = self.pg0.get_capture(1)
p = capture[0]
try:
ip = p[IP]
self.pg_start()
# verify number of translated packet
- capture = self.pg1.get_capture()
- self.assertEqual(pkts_num, len(capture))
+ self.pg1.get_capture(pkts_num)
+
+ def test_interface_addr(self):
+ """ Acquire SNAT addresses from interface """
+ self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
+
+ # no address in NAT pool
+ adresses = self.vapi.snat_address_dump()
+ self.assertEqual(0, len(adresses))
+
+ # configure interface address and check NAT address pool
+ self.pg7.config_ip4()
+ adresses = self.vapi.snat_address_dump()
+ self.assertEqual(1, len(adresses))
+
+ # remove interface address and check NAT address pool
+ self.pg7.unconfig_ip4()
+ adresses = self.vapi.snat_address_dump()
+ self.assertEqual(0, len(adresses))
def tearDown(self):
super(TestSNAT, self).tearDown()