+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg5.remote_ip4)
+ self.assertEqual(tcp.dport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from service back to client (both VRF1)
+ p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
+ IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
+ TCP(sport=local_port, dport=12345))
+ self.pg5.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg6.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, external_addr)
+ self.assertEqual(tcp.sport, external_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # dynamic NAT from VRF1 to VRF0 (output-feature)
+ p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
+ IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=2345, dport=22))
+ self.pg5.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertNotEqual(tcp.sport, 2345)
+ self.assert_packet_checksums_valid(p)
+ port = tcp.sport
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=22, dport=port))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg5.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg5.remote_ip4)
+ self.assertEqual(tcp.dport, 2345)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from client VRF1 to service VRF0
+ p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
+ IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) /
+ TCP(sport=12346, dport=external_port))
+ self.pg6.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg0.remote_ip4)
+ self.assertEqual(tcp.dport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from service VRF0 back to client VRF1
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
+ TCP(sport=local_port, dport=12346))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg6.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.pg0.local_ip4)
+ self.assertEqual(tcp.sport, external_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from client VRF0 to service VRF1
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=external_addr) /
+ TCP(sport=12347, dport=external_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg5.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg5.remote_ip4)
+ self.assertEqual(tcp.dport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from service VRF1 back to client VRF0
+ p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
+ IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
+ TCP(sport=local_port, dport=12347))
+ self.pg5.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, external_addr)
+ self.assertEqual(tcp.sport, external_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from client to server (both VRF1, no translation)
+ p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
+ IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) /
+ TCP(sport=12348, dport=local_port))
+ self.pg6.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg5.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg5.remote_ip4)
+ self.assertEqual(tcp.dport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from server back to client (both VRF1, no translation)
+ p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
+ IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
+ TCP(sport=local_port, dport=12348))
+ self.pg5.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg6.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.pg5.remote_ip4)
+ self.assertEqual(tcp.sport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from client VRF1 to server VRF0 (no translation)
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
+ TCP(sport=local_port, dport=12349))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg6.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.pg0.remote_ip4)
+ self.assertEqual(tcp.sport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from server VRF0 back to client VRF1 (no translation)
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
+ TCP(sport=local_port, dport=12349))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg6.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.pg0.remote_ip4)
+ self.assertEqual(tcp.sport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from client VRF0 to server VRF1 (no translation)
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4) /
+ TCP(sport=12344, dport=local_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg5.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg5.remote_ip4)
+ self.assertEqual(tcp.dport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from server VRF1 back to client VRF0 (no translation)
+ p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
+ IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
+ TCP(sport=local_port, dport=12344))
+ self.pg5.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.pg5.remote_ip4)
+ self.assertEqual(tcp.sport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ def test_session_timeout(self):
+ """ NAT44 session timeouts """
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ self.vapi.nat_set_timeouts(icmp=5)
+
+ max_sessions = 1000
+ pkts = []
+ for i in range(0, max_sessions):
+ src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=src, dst=self.pg1.remote_ip4) /
+ ICMP(id=1025, type='echo-request'))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(max_sessions)
+
+ sleep(10)
+
+ pkts = []
+ for i in range(0, max_sessions):
+ src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=src, dst=self.pg1.remote_ip4) /
+ ICMP(id=1026, type='echo-request'))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(max_sessions)
+
+ nsessions = 0
+ users = self.vapi.nat44_user_dump()
+ for user in users:
+ nsessions = nsessions + user.nsessions
+ self.assertLess(nsessions, 2 * max_sessions)
+
+ @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ def test_session_limit_per_user(self):
+ """ Maximum sessions per user limit """
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
+ src_address=self.pg2.local_ip4n,
+ path_mtu=512,
+ template_interval=10)
+
+ # get maximum number of translations per user
+ nat44_config = self.vapi.nat_show_config()
+
+ pkts = []
+ for port in range(0, nat44_config.max_translations_per_user):
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ UDP(sport=1025 + port, dport=1025 + port))
+ pkts.append(p)
+
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+
+ self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
+ src_port=self.ipfix_src_port)
+
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ UDP(sport=3001, dport=3002))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.assert_nothing_captured()
+
+ # verify IPFIX logging
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ sleep(1)
+ capture = self.pg2.get_capture(10)
+ ipfix = IPFIXDecoder()
+ # first load template
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ if p.haslayer(Template):
+ ipfix.add_template(p.getlayer(Template))
+ # verify events in data set
+ for p in capture:
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ self.verify_ipfix_max_entries_per_user(
+ data,
+ nat44_config.max_translations_per_user,
+ self.pg0.remote_ip4n)
+
+ def tearDown(self):
+ super(TestNAT44EndpointDependent, self).tearDown()
+ if not self.vpp_dead:
+ self.logger.info(self.vapi.cli("show nat44 addresses"))
+ self.logger.info(self.vapi.cli("show nat44 interfaces"))
+ self.logger.info(self.vapi.cli("show nat44 static mappings"))
+ self.logger.info(self.vapi.cli("show nat44 interface address"))
+ self.logger.info(self.vapi.cli("show nat44 sessions detail"))
+ self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
+ self.logger.info(self.vapi.cli("show nat timeouts"))
+ self.clear_nat44()
+ self.vapi.cli("clear logging")
+
+
+class TestNAT44Out2InDPO(MethodHolder):
+ """ NAT44 Test Cases using out2in DPO """
+
+ @classmethod
+ def setUpConstants(cls):
+ super(TestNAT44Out2InDPO, cls).setUpConstants()
+ cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestNAT44Out2InDPO, cls).setUpClass()
+ cls.vapi.cli("set log class nat level debug")
+
+ try:
+ cls.tcp_port_in = 6303
+ cls.tcp_port_out = 6303
+ cls.udp_port_in = 6304
+ cls.udp_port_out = 6304
+ cls.icmp_id_in = 6305
+ cls.icmp_id_out = 6305
+ cls.nat_addr = '10.0.0.3'
+ cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
+ cls.dst_ip4 = '192.168.70.1'
+
+ cls.create_pg_interfaces(range(2))
+