proto,
is_add)
- def snat_add_address(self, ip, is_add=1):
+ def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
"""
Add/delete S-NAT address
:param is_add: 1 if add, 0 if delete (Default add)
"""
snat_addr = socket.inet_pton(socket.AF_INET, ip)
- self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add)
+ self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
+ vrf_id=vrf_id)
def test_dynamic(self):
""" SNAT dynamic translation test """
src_ip=self.pg1.local_ip4)
def test_dynamic_icmp_errors_in2out_ttl_2(self):
- """ SNAT handling of error respones to client packets with TTL=2 """
+ """ SNAT handling of error responses to client packets with TTL=2 """
self.snat_add_address(self.snat_addr)
self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
self.verify_capture_in_with_icmp_errors(capture, self.pg0)
def test_dynamic_icmp_errors_out2in_ttl_2(self):
- """ SNAT handling of error respones to server packets with TTL=2 """
+ """ SNAT handling of error responses to server packets with TTL=2 """
self.snat_add_address(self.snat_addr)
self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out_with_icmp_errors(capture)
+ def test_ping_out_interface_from_outside(self):
+ """ Ping SNAT out interface from outside """
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
+ ICMP(id=self.icmp_id_out, type='echo-request'))
+ pkts = [p]
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.assertEqual(1, len(capture))
+ packet = capture[0]
+ try:
+ self.assertEqual(packet[IP].src, self.pg1.local_ip4)
+ self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
+ self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ self.assertEqual(packet[ICMP].type, 0) # echo reply
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(outside network):", packet))
+ raise
+
def test_static_in(self):
""" SNAT 1:1 NAT initialized from inside network """
capture = self.pg5.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg5)
+ # pg5 session dump
+ addresses = self.vapi.snat_address_dump()
+ self.assertEqual(len(addresses), 1)
+ sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
+ self.assertEqual(len(sessions), 3)
+ for session in sessions:
+ self.assertFalse(session.is_static)
+ self.assertEqual(session.inside_ip_address[0:4],
+ self.pg5.remote_ip4n)
+ self.assertEqual(session.outside_ip_address,
+ addresses[0].ip_address)
+ self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
+ self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
+ self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
+ self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
+ self.assertEqual(sessions[1].inside_port, self.udp_port_in)
+ self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
+ self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
+ self.assertEqual(sessions[1].outside_port, self.udp_port_out)
+ self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
+
# in2out 3rd interface
pkts = self.create_stream_in(self.pg6, self.pg3)
self.pg6.add_stream(pkts)
capture = self.pg6.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg6)
+ # general user and session dump verifications
+ users = self.vapi.snat_user_dump()
+ self.assertTrue(len(users) >= 3)
+ addresses = self.vapi.snat_address_dump()
+ self.assertEqual(len(addresses), 1)
+ for user in users:
+ sessions = self.vapi.snat_user_session_dump(user.ip_address,
+ user.vrf_id)
+ for session in sessions:
+ self.assertEqual(user.ip_address, session.inside_ip_address)
+ self.assertTrue(session.total_bytes > session.total_pkts > 0)
+ self.assertTrue(session.protocol in
+ [IP_PROTOS.tcp, IP_PROTOS.udp,
+ IP_PROTOS.icmp])
+
+ # pg4 session dump
+ sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
+ self.assertTrue(len(sessions) >= 4)
+ for session in sessions:
+ self.assertFalse(session.is_static)
+ self.assertEqual(session.inside_ip_address[0:4],
+ self.pg4.remote_ip4n)
+ self.assertEqual(session.outside_ip_address,
+ addresses[0].ip_address)
+
+ # pg6 session dump
+ sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
+ self.assertTrue(len(sessions) >= 3)
+ for session in sessions:
+ self.assertTrue(session.is_static)
+ self.assertEqual(session.inside_ip_address[0:4],
+ self.pg6.remote_ip4n)
+ self.assertEqual(map(ord, session.outside_ip_address[0:4]),
+ map(int, static_nat_ip.split('.')))
+ self.assertTrue(session.inside_port in
+ [self.tcp_port_in, self.udp_port_in,
+ self.icmp_id_in])
+
def test_hairpinning(self):
""" SNAT hairpinning """
self.pg_start()
capture = self.pg1.get_capture(0)
+ def test_vrf_mode(self):
+ """ S-NAT tenant VRF aware address pool mode """
+
+ vrf_id1 = 1
+ vrf_id2 = 2
+ nat_ip1 = "10.0.0.10"
+ nat_ip2 = "10.0.0.11"
+
+ self.pg0.unconfig_ip4()
+ self.pg1.unconfig_ip4()
+ self.pg0.set_table_ip4(vrf_id1)
+ self.pg1.set_table_ip4(vrf_id2)
+ self.pg0.config_ip4()
+ self.pg1.config_ip4()
+
+ self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
+ self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
+ is_inside=0)
+
+ # first VRF
+ pkts = self.create_stream_in(self.pg0, self.pg2)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg2.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip1)
+
+ # second VRF
+ pkts = self.create_stream_in(self.pg1, self.pg2)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg2.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip2)
+
+ def test_vrf_feature_independent(self):
+ """ S-NAT tenant VRF independent address pool mode """
+
+ nat_ip1 = "10.0.0.10"
+ nat_ip2 = "10.0.0.11"
+
+ self.snat_add_address(nat_ip1)
+ self.snat_add_address(nat_ip2)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
+ is_inside=0)
+
+ # first VRF
+ pkts = self.create_stream_in(self.pg0, self.pg2)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg2.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip1)
+
+ # second VRF
+ pkts = self.create_stream_in(self.pg1, self.pg2)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg2.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip1)
+
def tearDown(self):
super(TestSNAT, self).tearDown()
if not self.vpp_dead:
self.clear_snat()
+class TestDeterministicNAT(VppTestCase):
+ """ Deterministic NAT Test Cases """
+
+ @classmethod
+ def setUpConstants(cls):
+ super(TestDeterministicNAT, cls).setUpConstants()
+ cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestDeterministicNAT, cls).setUpClass()
+
+ try:
+ cls.create_pg_interfaces(range(2))
+ cls.interfaces = list(cls.pg_interfaces)
+
+ for i in cls.interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ except Exception:
+ super(TestDeterministicNAT, cls).tearDownClass()
+ raise
+
+ def test_deterministic_mode(self):
+ """ S-NAT run deterministic mode """
+ in_addr = '172.16.255.0'
+ out_addr = '172.17.255.50'
+ in_addr_t = '172.16.255.20'
+ in_addr_n = socket.inet_aton(in_addr)
+ out_addr_n = socket.inet_aton(out_addr)
+ in_addr_t_n = socket.inet_aton(in_addr_t)
+ in_plen = 24
+ out_plen = 32
+
+ snat_config = self.vapi.snat_show_config()
+ self.assertEqual(1, snat_config.deterministic)
+
+ self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
+
+ rep1 = self.vapi.snat_det_forward(in_addr_t_n)
+ self.assertEqual(rep1.out_addr[:4], out_addr_n)
+ rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
+ self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
+
+ deterministic_mappings = self.vapi.snat_det_map_dump()
+ self.assertEqual(len(deterministic_mappings), 1)
+ dsm = deterministic_mappings[0]
+ self.assertEqual(in_addr_n, dsm.in_addr[:4])
+ self.assertEqual(in_plen, dsm.in_plen)
+ self.assertEqual(out_addr_n, dsm.out_addr[:4])
+ self.assertEqual(out_plen, dsm.out_plen)
+
+ def clear_snat(self):
+ """
+ Clear SNAT configuration.
+ """
+ deterministic_mappings = self.vapi.snat_det_map_dump()
+ for dsm in deterministic_mappings:
+ self.vapi.snat_add_det_map(dsm.in_addr,
+ dsm.in_plen,
+ dsm.out_addr,
+ dsm.out_plen,
+ is_add=0)
+
+ def tearDown(self):
+ super(TestDeterministicNAT, self).tearDown()
+ if not self.vpp_dead:
+ self.logger.info(self.vapi.cli("show snat detail"))
+ self.clear_snat()
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)