from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
-class TestSNAT(VppTestCase):
- """ SNAT Test Cases """
+class MethodHolder(VppTestCase):
+ """ SNAT create capture and verify method holder """
@classmethod
def setUpClass(cls):
- super(TestSNAT, cls).setUpClass()
-
- try:
- cls.tcp_port_in = 6303
- cls.tcp_port_out = 6303
- cls.udp_port_in = 6304
- cls.udp_port_out = 6304
- cls.icmp_id_in = 6305
- cls.icmp_id_out = 6305
- cls.snat_addr = '10.0.0.3'
-
- cls.create_pg_interfaces(range(8))
- cls.interfaces = list(cls.pg_interfaces[0:4])
-
- for i in cls.interfaces:
- i.admin_up()
- i.config_ip4()
- i.resolve_arp()
-
- cls.pg0.generate_remote_hosts(2)
- cls.pg0.configure_ipv4_neighbors()
-
- cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
-
- cls.pg4._local_ip4 = "172.16.255.1"
- cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
- cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
- cls.pg4.set_table_ip4(10)
- cls.pg5._local_ip4 = "172.16.255.3"
- cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
- cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
- cls.pg5.set_table_ip4(10)
- cls.pg6._local_ip4 = "172.16.255.1"
- cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
- cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
- cls.pg6.set_table_ip4(20)
- for i in cls.overlapping_interfaces:
- i.config_ip4()
- i.admin_up()
- i.resolve_arp()
-
- cls.pg7.admin_up()
+ super(MethodHolder, cls).setUpClass()
- except Exception:
- super(TestSNAT, cls).tearDownClass()
- raise
+ def tearDown(self):
+ super(MethodHolder, self).tearDown()
def create_stream_in(self, in_if, out_if, ttl=64):
"""
# natPoolID
self.assertEqual(struct.pack("!I", 0), record[283])
+
+class TestSNAT(MethodHolder):
+ """ SNAT Test Cases """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestSNAT, cls).setUpClass()
+
+ try:
+ cls.tcp_port_in = 6303
+ cls.tcp_port_out = 6303
+ cls.udp_port_in = 6304
+ cls.udp_port_out = 6304
+ cls.icmp_id_in = 6305
+ cls.icmp_id_out = 6305
+ cls.snat_addr = '10.0.0.3'
+
+ cls.create_pg_interfaces(range(8))
+ cls.interfaces = list(cls.pg_interfaces[0:4])
+
+ for i in cls.interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ cls.pg0.generate_remote_hosts(2)
+ cls.pg0.configure_ipv4_neighbors()
+
+ cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
+
+ cls.pg4._local_ip4 = "172.16.255.1"
+ cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
+ cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
+ cls.pg4.set_table_ip4(10)
+ cls.pg5._local_ip4 = "172.16.255.3"
+ cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
+ cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
+ cls.pg5.set_table_ip4(10)
+ cls.pg6._local_ip4 = "172.16.255.1"
+ cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
+ cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
+ cls.pg6.set_table_ip4(20)
+ for i in cls.overlapping_interfaces:
+ i.config_ip4()
+ i.admin_up()
+ i.resolve_arp()
+
+ cls.pg7.admin_up()
+
+ except Exception:
+ super(TestSNAT, cls).tearDownClass()
+ raise
+
def clear_snat(self):
"""
Clear SNAT configuration.
self.clear_snat()
+class TestDeterministicNAT(MethodHolder):
+ """ Deterministic NAT Test Cases """
+
+ @classmethod
+ def setUpConstants(cls):
+ super(TestDeterministicNAT, cls).setUpConstants()
+ cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestDeterministicNAT, cls).setUpClass()
+
+ try:
+ cls.create_pg_interfaces(range(2))
+ cls.interfaces = list(cls.pg_interfaces)
+
+ for i in cls.interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ except Exception:
+ super(TestDeterministicNAT, cls).tearDownClass()
+ raise
+
+ def test_deterministic_mode(self):
+ """ S-NAT run deterministic mode """
+ in_addr = '172.16.255.0'
+ out_addr = '172.17.255.50'
+ in_addr_t = '172.16.255.20'
+ in_addr_n = socket.inet_aton(in_addr)
+ out_addr_n = socket.inet_aton(out_addr)
+ in_addr_t_n = socket.inet_aton(in_addr_t)
+ in_plen = 24
+ out_plen = 32
+
+ snat_config = self.vapi.snat_show_config()
+ self.assertEqual(1, snat_config.deterministic)
+
+ self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
+
+ rep1 = self.vapi.snat_det_forward(in_addr_t_n)
+ self.assertEqual(rep1.out_addr[:4], out_addr_n)
+ rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
+ self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
+
+ deterministic_mappings = self.vapi.snat_det_map_dump()
+ self.assertEqual(len(deterministic_mappings), 1)
+ dsm = deterministic_mappings[0]
+ self.assertEqual(in_addr_n, dsm.in_addr[:4])
+ self.assertEqual(in_plen, dsm.in_plen)
+ self.assertEqual(out_addr_n, dsm.out_addr[:4])
+ self.assertEqual(out_plen, dsm.out_plen)
+
+ self.clear_snat()
+ deterministic_mappings = self.vapi.snat_det_map_dump()
+ self.assertEqual(len(deterministic_mappings), 0)
+
+ def test_set_timeouts(self):
+ """ Set deterministic NAT timeouts """
+ timeouts_before = self.vapi.snat_det_get_timeouts()
+
+ self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
+ timeouts_before.tcp_established + 10,
+ timeouts_before.tcp_transitory + 10,
+ timeouts_before.icmp + 10)
+
+ timeouts_after = self.vapi.snat_det_get_timeouts()
+
+ self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
+ self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
+ self.assertNotEqual(timeouts_before.tcp_established,
+ timeouts_after.tcp_established)
+ self.assertNotEqual(timeouts_before.tcp_transitory,
+ timeouts_after.tcp_transitory)
+
+ def clear_snat(self):
+ """
+ Clear SNAT configuration.
+ """
+ self.vapi.snat_det_set_timeouts()
+ deterministic_mappings = self.vapi.snat_det_map_dump()
+ for dsm in deterministic_mappings:
+ self.vapi.snat_add_det_map(dsm.in_addr,
+ dsm.in_plen,
+ dsm.out_addr,
+ dsm.out_plen,
+ is_add=0)
+
+ def tearDown(self):
+ super(TestDeterministicNAT, self).tearDown()
+ if not self.vpp_dead:
+ self.logger.info(self.vapi.cli("show snat detail"))
+ self.clear_snat()
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)