+ def test_port_range(self):
+ """ External address port range """
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+ self.vapi.nat_set_addr_and_port_alloc_alg(alg=2,
+ start_port=1025,
+ end_port=1027)
+
+ pkts = []
+ for port in range(0, 5):
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=1125 + port))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(3)
+ for p in capture:
+ tcp = p[TCP]
+ self.assertGreaterEqual(tcp.sport, 1025)
+ self.assertLessEqual(tcp.sport, 1027)
+
+ def test_ipfix_max_frags(self):
+ """ IPFIX logging maximum fragments pending reassembly exceeded """
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+ self.vapi.nat_set_reass(timeout=2, max_reass=1024, max_frag=1,
+ drop_frag=0)
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
+ src_address=self.pg3.local_ip4n,
+ path_mtu=512,
+ template_interval=10)
+ self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
+ src_port=self.ipfix_src_port,
+ enable=1)
+
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
+ self.tcp_port_in = random.randint(1025, 65535)
+ pkts = self.create_stream_frag(self.pg0,
+ self.pg1.remote_ip4,
+ self.tcp_port_in,
+ 20,
+ data)
+ pkts.reverse()
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.assert_nothing_captured()
+ sleep(1)
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ capture = self.pg3.get_capture(9)
+ ipfix = IPFIXDecoder()
+ # first load template
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ self.assertEqual(p[IP].src, self.pg3.local_ip4)
+ self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+ self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+ self.assertEqual(p[UDP].dport, 4739)
+ self.assertEqual(p[IPFIX].observationDomainID,
+ self.ipfix_domain_id)
+ if p.haslayer(Template):
+ ipfix.add_template(p.getlayer(Template))
+ # verify events in data set
+ for p in capture:
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ self.verify_ipfix_max_fragments_ip4(data, 1,
+ self.pg0.remote_ip4n)
+
+ def test_multiple_outside_vrf(self):
+ """ Multiple outside VRF """
+ vrf_id1 = 1
+ vrf_id2 = 2
+
+ self.pg1.unconfig_ip4()
+ self.pg2.unconfig_ip4()
+ self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id1)
+ self.vapi.ip_table_add_del(is_add=1, table_id=vrf_id2)
+ self.pg1.set_table_ip4(vrf_id1)
+ self.pg2.set_table_ip4(vrf_id2)
+ self.pg1.config_ip4()
+ self.pg2.config_ip4()
+ self.pg1.resolve_arp()
+ self.pg2.resolve_arp()
+
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg2.sw_if_index,
+ is_add=1)
+
+ try:
+ # first VRF
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, self.nat_addr)
+
+ pkts = self.create_stream_out(self.pg1, self.nat_addr)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ self.tcp_port_in = 60303
+ self.udp_port_in = 60304
+ self.icmp_id_in = 60305
+
+ # second VRF
+ pkts = self.create_stream_in(self.pg0, self.pg2)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg2.get_capture(len(pkts))
+ self.verify_capture_out(capture, self.nat_addr)
+
+ pkts = self.create_stream_out(self.pg2, self.nat_addr)
+ self.pg2.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ finally:
+ self.nat44_add_address(self.nat_addr, is_add=0)
+ self.pg1.unconfig_ip4()
+ self.pg2.unconfig_ip4()
+ self.pg1.set_table_ip4(0)
+ self.pg2.set_table_ip4(0)
+ self.pg1.config_ip4()
+ self.pg2.config_ip4()
+ self.pg1.resolve_arp()
+ self.pg2.resolve_arp()
+
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ def test_session_timeout(self):
+ """ NAT44 session timeouts """
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+ self.vapi.nat_set_timeouts(udp=5, tcp_established=7440,
+ tcp_transitory=240, icmp=60)
+
+ max_sessions = 1000
+ pkts = []
+ for i in range(0, max_sessions):
+ src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=src, dst=self.pg1.remote_ip4) /
+ UDP(sport=1025, dport=53))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(max_sessions)
+
+ sleep(6)
+
+ pkts = []
+ for i in range(0, max_sessions):
+ src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=src, dst=self.pg1.remote_ip4) /
+ UDP(sport=1026, dport=53))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(max_sessions)
+
+ nsessions = 0
+ users = self.vapi.nat44_user_dump()
+ for user in users:
+ nsessions = nsessions + user.nsessions
+ self.assertLess(nsessions, 2 * max_sessions)
+
+ def test_mss_clamping(self):
+ """ TCP MSS clamping """
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="S", options=[('MSS', 1400)]))
+
+ self.vapi.nat_set_mss_clamping(enable=1, mss_value=1000)
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ # Negotiated MSS value greater than configured - changed
+ self.verify_mss_value(capture[0], 1000)
+
+ self.vapi.nat_set_mss_clamping(enable=0, mss_value=1500)
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ # MSS clamping disabled - negotiated MSS unchanged
+ self.verify_mss_value(capture[0], 1400)
+
+ self.vapi.nat_set_mss_clamping(enable=1, mss_value=1500)
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ # Negotiated MSS value smaller than configured - unchanged
+ self.verify_mss_value(capture[0], 1400)
+
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ def test_ha_send(self):
+ """ Send HA session synchronization events (active) """
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+ self.vapi.nat_ha_set_listener(ip_address=self.pg3.local_ip4,
+ port=12345,
+ path_mtu=512)
+ self.vapi.nat_ha_set_failover(ip_address=self.pg3.remote_ip4,
+ port=12346, session_refresh_interval=10)
+ bind_layers(UDP, HANATStateSync, sport=12345)
+
+ # create sessions
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+ # active send HA events
+ self.vapi.nat_ha_flush()
+ stats = self.statistics.get_counter('/nat44/ha/add-event-send')
+ self.assertEqual(stats[0][0], 3)
+ capture = self.pg3.get_capture(1)
+ p = capture[0]
+ self.assert_packet_checksums_valid(p)
+ try:
+ ip = p[IP]
+ udp = p[UDP]
+ hanat = p[HANATStateSync]
+ except IndexError:
+ self.logger.error(ppp("Invalid packet:", p))
+ raise
+ else:
+ self.assertEqual(ip.src, self.pg3.local_ip4)
+ self.assertEqual(ip.dst, self.pg3.remote_ip4)
+ self.assertEqual(udp.sport, 12345)
+ self.assertEqual(udp.dport, 12346)
+ self.assertEqual(hanat.version, 1)
+ self.assertEqual(hanat.thread_index, 0)
+ self.assertEqual(hanat.count, 3)
+ seq = hanat.sequence_number
+ for event in hanat.events:
+ self.assertEqual(event.event_type, 1)
+ self.assertEqual(event.in_addr, self.pg0.remote_ip4)
+ self.assertEqual(event.out_addr, self.nat_addr)
+ self.assertEqual(event.fib_index, 0)
+
+ # ACK received events
+ ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
+ IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
+ UDP(sport=12346, dport=12345) /
+ HANATStateSync(sequence_number=seq, flags='ACK'))
+ self.pg3.add_stream(ack)
+ self.pg_start()
+ stats = self.statistics.get_counter('/nat44/ha/ack-recv')
+ self.assertEqual(stats[0][0], 1)
+
+ # delete one session
+ self.pg_enable_capture(self.pg_interfaces)
+ self.vapi.nat44_del_session(address=self.pg0.remote_ip4n,
+ port=self.tcp_port_in,
+ protocol=IP_PROTOS.tcp,
+ flags=self.config_flags.NAT_IS_INSIDE)
+ self.vapi.nat_ha_flush()
+ stats = self.statistics.get_counter('/nat44/ha/del-event-send')
+ self.assertEqual(stats[0][0], 1)
+ capture = self.pg3.get_capture(1)
+ p = capture[0]
+ try:
+ hanat = p[HANATStateSync]
+ except IndexError:
+ self.logger.error(ppp("Invalid packet:", p))
+ raise
+ else:
+ self.assertGreater(hanat.sequence_number, seq)
+
+ # do not send ACK, active retry send HA event again
+ self.pg_enable_capture(self.pg_interfaces)
+ sleep(12)
+ stats = self.statistics.get_counter('/nat44/ha/retry-count')
+ self.assertEqual(stats[0][0], 3)
+ stats = self.statistics.get_counter('/nat44/ha/missed-count')
+ self.assertEqual(stats[0][0], 1)
+ capture = self.pg3.get_capture(3)
+ for packet in capture:
+ self.assertEqual(packet, p)
+
+ # session counters refresh
+ pkts = self.create_stream_out(self.pg1)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(2)
+ self.vapi.nat_ha_flush()
+ stats = self.statistics.get_counter('/nat44/ha/refresh-event-send')
+ self.assertEqual(stats[0][0], 2)
+ capture = self.pg3.get_capture(1)
+ p = capture[0]
+ self.assert_packet_checksums_valid(p)
+ try:
+ ip = p[IP]
+ udp = p[UDP]
+ hanat = p[HANATStateSync]
+ except IndexError:
+ self.logger.error(ppp("Invalid packet:", p))
+ raise
+ else:
+ self.assertEqual(ip.src, self.pg3.local_ip4)
+ self.assertEqual(ip.dst, self.pg3.remote_ip4)
+ self.assertEqual(udp.sport, 12345)
+ self.assertEqual(udp.dport, 12346)
+ self.assertEqual(hanat.version, 1)
+ self.assertEqual(hanat.count, 2)
+ seq = hanat.sequence_number
+ for event in hanat.events:
+ self.assertEqual(event.event_type, 3)
+ self.assertEqual(event.out_addr, self.nat_addr)
+ self.assertEqual(event.fib_index, 0)
+ self.assertEqual(event.total_pkts, 2)
+ self.assertGreater(event.total_bytes, 0)
+
+ ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
+ IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
+ UDP(sport=12346, dport=12345) /
+ HANATStateSync(sequence_number=seq, flags='ACK'))
+ self.pg3.add_stream(ack)
+ self.pg_start()
+ stats = self.statistics.get_counter('/nat44/ha/ack-recv')
+ self.assertEqual(stats[0][0], 2)
+
+ def test_ha_recv(self):
+ """ Receive HA session synchronization events (passive) """
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+ self.vapi.nat_ha_set_listener(ip_address=self.pg3.local_ip4,
+ port=12345,
+ path_mtu=512)
+ bind_layers(UDP, HANATStateSync, sport=12345)
+
+ self.tcp_port_out = random.randint(1025, 65535)
+ self.udp_port_out = random.randint(1025, 65535)
+
+ # send HA session add events to failover/passive
+ p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
+ IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
+ UDP(sport=12346, dport=12345) /
+ HANATStateSync(sequence_number=1, events=[
+ Event(event_type='add', protocol='tcp',
+ in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
+ in_port=self.tcp_port_in, out_port=self.tcp_port_out,
+ eh_addr=self.pg1.remote_ip4,
+ ehn_addr=self.pg1.remote_ip4,
+ eh_port=self.tcp_external_port,
+ ehn_port=self.tcp_external_port, fib_index=0),
+ Event(event_type='add', protocol='udp',
+ in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
+ in_port=self.udp_port_in, out_port=self.udp_port_out,
+ eh_addr=self.pg1.remote_ip4,
+ ehn_addr=self.pg1.remote_ip4,
+ eh_port=self.udp_external_port,
+ ehn_port=self.udp_external_port, fib_index=0)]))
+
+ self.pg3.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ # receive ACK
+ capture = self.pg3.get_capture(1)
+ p = capture[0]
+ try:
+ hanat = p[HANATStateSync]
+ except IndexError:
+ self.logger.error(ppp("Invalid packet:", p))
+ raise
+ else:
+ self.assertEqual(hanat.sequence_number, 1)
+ self.assertEqual(hanat.flags, 'ACK')
+ self.assertEqual(hanat.version, 1)
+ self.assertEqual(hanat.thread_index, 0)
+ stats = self.statistics.get_counter('/nat44/ha/ack-send')
+ self.assertEqual(stats[0][0], 1)
+ stats = self.statistics.get_counter('/nat44/ha/add-event-recv')
+ self.assertEqual(stats[0][0], 2)
+ users = self.statistics.get_counter('/nat44/total-users')
+ self.assertEqual(users[0][0], 1)
+ sessions = self.statistics.get_counter('/nat44/total-sessions')
+ self.assertEqual(sessions[0][0], 2)
+ users = self.vapi.nat44_user_dump()
+ self.assertEqual(len(users), 1)
+ self.assertEqual(str(users[0].ip_address),
+ self.pg0.remote_ip4)
+ # there should be 2 sessions created by HA
+ sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
+ users[0].vrf_id)
+ self.assertEqual(len(sessions), 2)
+ for session in sessions:
+ self.assertEqual(str(session.inside_ip_address),
+ self.pg0.remote_ip4)
+ self.assertEqual(str(session.outside_ip_address),
+ self.nat_addr)
+ self.assertIn(session.inside_port,
+ [self.tcp_port_in, self.udp_port_in])
+ self.assertIn(session.outside_port,
+ [self.tcp_port_out, self.udp_port_out])
+ self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
+
+ # send HA session delete event to failover/passive
+ p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
+ IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
+ UDP(sport=12346, dport=12345) /
+ HANATStateSync(sequence_number=2, events=[
+ Event(event_type='del', protocol='udp',
+ in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
+ in_port=self.udp_port_in, out_port=self.udp_port_out,
+ eh_addr=self.pg1.remote_ip4,
+ ehn_addr=self.pg1.remote_ip4,
+ eh_port=self.udp_external_port,
+ ehn_port=self.udp_external_port, fib_index=0)]))
+
+ self.pg3.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ # receive ACK
+ capture = self.pg3.get_capture(1)
+ p = capture[0]
+ try:
+ hanat = p[HANATStateSync]
+ except IndexError:
+ self.logger.error(ppp("Invalid packet:", p))
+ raise
+ else:
+ self.assertEqual(hanat.sequence_number, 2)
+ self.assertEqual(hanat.flags, 'ACK')
+ self.assertEqual(hanat.version, 1)
+ users = self.vapi.nat44_user_dump()
+ self.assertEqual(len(users), 1)
+ self.assertEqual(str(users[0].ip_address),
+ self.pg0.remote_ip4)
+ # now we should have only 1 session, 1 deleted by HA
+ sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
+ users[0].vrf_id)
+ self.assertEqual(len(sessions), 1)
+ stats = self.statistics.get_counter('/nat44/ha/del-event-recv')
+ self.assertEqual(stats[0][0], 1)
+
+ stats = self.statistics.get_err_counter('/err/nat-ha/pkts-processed')
+ self.assertEqual(stats, 2)
+
+ # send HA session refresh event to failover/passive
+ p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
+ IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
+ UDP(sport=12346, dport=12345) /
+ HANATStateSync(sequence_number=3, events=[
+ Event(event_type='refresh', protocol='tcp',
+ in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
+ in_port=self.tcp_port_in, out_port=self.tcp_port_out,
+ eh_addr=self.pg1.remote_ip4,
+ ehn_addr=self.pg1.remote_ip4,
+ eh_port=self.tcp_external_port,
+ ehn_port=self.tcp_external_port, fib_index=0,
+ total_bytes=1024, total_pkts=2)]))
+ self.pg3.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ # receive ACK
+ capture = self.pg3.get_capture(1)
+ p = capture[0]
+ try:
+ hanat = p[HANATStateSync]
+ except IndexError:
+ self.logger.error(ppp("Invalid packet:", p))
+ raise
+ else:
+ self.assertEqual(hanat.sequence_number, 3)
+ self.assertEqual(hanat.flags, 'ACK')
+ self.assertEqual(hanat.version, 1)
+ users = self.vapi.nat44_user_dump()
+ self.assertEqual(len(users), 1)
+ self.assertEqual(str(users[0].ip_address),
+ self.pg0.remote_ip4)
+ sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
+ users[0].vrf_id)
+ self.assertEqual(len(sessions), 1)
+ session = sessions[0]
+ self.assertEqual(session.total_bytes, 1024)
+ self.assertEqual(session.total_pkts, 2)
+ stats = self.statistics.get_counter('/nat44/ha/refresh-event-recv')
+ self.assertEqual(stats[0][0], 1)
+
+ stats = self.statistics.get_err_counter('/err/nat-ha/pkts-processed')
+ self.assertEqual(stats, 3)
+
+ # send packet to test session created by HA
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ except IndexError:
+ self.logger.error(ppp("Invalid packet:", p))
+ raise
+ else:
+ self.assertEqual(ip.src, self.pg1.remote_ip4)
+ self.assertEqual(ip.dst, self.pg0.remote_ip4)
+ self.assertEqual(tcp.sport, self.tcp_external_port)
+ self.assertEqual(tcp.dport, self.tcp_port_in)
+
+ def tearDown(self):
+ super(TestNAT44, self).tearDown()
+ self.clear_nat44()
+ self.vapi.cli("clear logging")
+
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show nat44 addresses"))
+ self.logger.info(self.vapi.cli("show nat44 interfaces"))
+ self.logger.info(self.vapi.cli("show nat44 static mappings"))
+ self.logger.info(self.vapi.cli("show nat44 interface address"))
+ self.logger.info(self.vapi.cli("show nat44 sessions detail"))
+ self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
+ self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
+ self.logger.info(self.vapi.cli("show nat timeouts"))
+ self.logger.info(
+ self.vapi.cli("show nat addr-port-assignment-alg"))
+ self.logger.info(self.vapi.cli("show nat ha"))
+
+
+class TestNAT44EndpointDependent(MethodHolder):
+ """ Endpoint-Dependent mapping and filtering test cases """
+
+ @classmethod
+ def setUpConstants(cls):
+ super(TestNAT44EndpointDependent, cls).setUpConstants()
+ cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"])
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestNAT44EndpointDependent, cls).setUpClass()
+ cls.vapi.cli("set log class nat level debug")
+ try:
+ cls.tcp_port_in = 6303
+ cls.tcp_port_out = 6303
+ cls.udp_port_in = 6304
+ cls.udp_port_out = 6304
+ cls.icmp_id_in = 6305
+ cls.icmp_id_out = 6305
+ cls.nat_addr = '10.0.0.3'
+ cls.ipfix_src_port = 4739
+ cls.ipfix_domain_id = 1
+ cls.tcp_external_port = 80
+
+ cls.create_pg_interfaces(range(7))
+ cls.interfaces = list(cls.pg_interfaces[0:3])
+
+ for i in cls.interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ cls.pg0.generate_remote_hosts(3)
+ cls.pg0.configure_ipv4_neighbors()
+
+ cls.pg3.admin_up()
+
+ cls.pg4.generate_remote_hosts(2)
+ cls.pg4.config_ip4()
+ ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
+ cls.vapi.sw_interface_add_del_address(
+ sw_if_index=cls.pg4.sw_if_index, address=ip_addr_n,
+ address_length=24)
+ cls.pg4.admin_up()
+ cls.pg4.resolve_arp()
+ cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
+ cls.pg4.resolve_arp()
+
+ zero_ip4n = socket.inet_pton(socket.AF_INET, "0.0.0.0")
+ cls.vapi.ip_table_add_del(is_add=1, table_id=1)
+
+ cls.pg5._local_ip4 = "10.1.1.1"
+ cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET,
+ cls.pg5.local_ip4)
+ cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
+ cls.pg5._remote_hosts[0]._ip4n = socket.inet_pton(
+ socket.AF_INET, cls.pg5.remote_ip4)
+ cls.pg5.set_table_ip4(1)
+ cls.pg5.config_ip4()
+ cls.pg5.admin_up()
+ r1 = VppIpRoute(cls, cls.pg5.remote_ip4, 32,
+ [VppRoutePath("0.0.0.0",
+ cls.pg5.sw_if_index)],
+ table_id=1,
+ register=False)
+ r1.add_vpp_config()
+
+ cls.pg6._local_ip4 = "10.1.2.1"
+ cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET,
+ cls.pg6.local_ip4)
+ cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
+ cls.pg6._remote_hosts[0]._ip4n = socket.inet_pton(
+ socket.AF_INET, cls.pg6.remote_ip4)
+ cls.pg6.set_table_ip4(1)
+ cls.pg6.config_ip4()
+ cls.pg6.admin_up()
+
+ r2 = VppIpRoute(cls, cls.pg6.remote_ip4, 32,
+ [VppRoutePath("0.0.0.0",
+ cls.pg6.sw_if_index)],
+ table_id=1,
+ register=False)
+ r3 = VppIpRoute(cls, cls.pg6.remote_ip4, 16,
+ [VppRoutePath("0.0.0.0",
+ 0xffffffff,
+ nh_table_id=1)],
+ table_id=0,
+ register=False)
+ r4 = VppIpRoute(cls, "0.0.0.0", 0,
+ [VppRoutePath("0.0.0.0", 0xffffffff,
+ nh_table_id=0)],
+ table_id=1,
+ register=False)
+ r5 = VppIpRoute(cls, "0.0.0.0", 0,
+ [VppRoutePath(cls.pg1.local_ip4,
+ cls.pg1.sw_if_index)],
+ register=False)
+ r2.add_vpp_config()
+ r3.add_vpp_config()
+ r4.add_vpp_config()
+ r5.add_vpp_config()
+
+ cls.pg5.resolve_arp()
+ cls.pg6.resolve_arp()
+
+ except Exception:
+ super(TestNAT44EndpointDependent, cls).tearDownClass()
+ raise
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestNAT44EndpointDependent, cls).tearDownClass()
+
+ def test_frag_in_order(self):
+ """ NAT44 translate fragments arriving in order """
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+ self.frag_in_order(proto=IP_PROTOS.tcp)
+ self.frag_in_order(proto=IP_PROTOS.udp)
+ self.frag_in_order(proto=IP_PROTOS.icmp)
+
+ def test_frag_in_order_dont_translate(self):
+ """ NAT44 don't translate fragments arriving in order """
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_forwarding_enable_disable(enable=True)
+ self.frag_in_order(proto=IP_PROTOS.tcp, dont_translate=True)
+
+ def test_frag_out_of_order(self):
+ """ NAT44 translate fragments arriving out of order """
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+ self.frag_out_of_order(proto=IP_PROTOS.tcp)
+ self.frag_out_of_order(proto=IP_PROTOS.udp)
+ self.frag_out_of_order(proto=IP_PROTOS.icmp)
+
+ def test_frag_out_of_order_dont_translate(self):
+ """ NAT44 don't translate fragments arriving out of order """
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_forwarding_enable_disable(enable=True)
+ self.frag_out_of_order(proto=IP_PROTOS.tcp, dont_translate=True)
+
+ def test_frag_in_order_in_plus_out(self):
+ """ in+out interface fragments in order """
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ flags=flags, is_add=1)
+
+ self.server = self.pg1.remote_hosts[0]
+
+ self.server_in_addr = self.server.ip4
+ self.server_out_addr = '11.11.11.11'
+ self.server_in_port = random.randint(1025, 65535)
+ self.server_out_port = random.randint(1025, 65535)
+
+ self.nat44_add_address(self.server_out_addr)
+
+ # add static mappings for server
+ self.nat44_add_static_mapping(self.server_in_addr,
+ self.server_out_addr,
+ self.server_in_port,
+ self.server_out_port,
+ proto=IP_PROTOS.tcp)
+ self.nat44_add_static_mapping(self.server_in_addr,
+ self.server_out_addr,
+ self.server_in_port,
+ self.server_out_port,
+ proto=IP_PROTOS.udp)
+ self.nat44_add_static_mapping(self.server_in_addr,
+ self.server_out_addr,
+ proto=IP_PROTOS.icmp)
+
+ self.vapi.nat_set_reass(timeout=10, max_reass=1024, max_frag=5,
+ drop_frag=0)
+
+ self.frag_in_order_in_plus_out(proto=IP_PROTOS.tcp)
+ self.frag_in_order_in_plus_out(proto=IP_PROTOS.udp)
+ self.frag_in_order_in_plus_out(proto=IP_PROTOS.icmp)
+
+ def test_frag_out_of_order_in_plus_out(self):
+ """ in+out interface fragments out of order """
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ flags=flags, is_add=1)
+
+ self.server = self.pg1.remote_hosts[0]
+
+ self.server_in_addr = self.server.ip4
+ self.server_out_addr = '11.11.11.11'
+ self.server_in_port = random.randint(1025, 65535)
+ self.server_out_port = random.randint(1025, 65535)
+
+ self.nat44_add_address(self.server_out_addr)
+
+ # add static mappings for server
+ self.nat44_add_static_mapping(self.server_in_addr,
+ self.server_out_addr,
+ self.server_in_port,
+ self.server_out_port,
+ proto=IP_PROTOS.tcp)
+ self.nat44_add_static_mapping(self.server_in_addr,
+ self.server_out_addr,
+ self.server_in_port,
+ self.server_out_port,
+ proto=IP_PROTOS.udp)
+ self.nat44_add_static_mapping(self.server_in_addr,
+ self.server_out_addr,
+ proto=IP_PROTOS.icmp)
+
+ self.vapi.nat_set_reass(timeout=10, max_reass=1024, max_frag=5,
+ drop_frag=0)
+
+ self.frag_out_of_order_in_plus_out(proto=IP_PROTOS.tcp)
+ self.frag_out_of_order_in_plus_out(proto=IP_PROTOS.udp)
+ self.frag_out_of_order_in_plus_out(proto=IP_PROTOS.icmp)
+
+ def test_reass_hairpinning(self):
+ """ NAT44 fragments hairpinning """
+ self.server = self.pg0.remote_hosts[1]
+ self.host_in_port = random.randint(1025, 65535)
+ self.server_in_port = random.randint(1025, 65535)
+ self.server_out_port = random.randint(1025, 65535)
+
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+ # add static mapping for server
+ self.nat44_add_static_mapping(self.server.ip4, self.nat_addr,
+ self.server_in_port,
+ self.server_out_port,
+ proto=IP_PROTOS.tcp)
+ self.nat44_add_static_mapping(self.server.ip4, self.nat_addr,
+ self.server_in_port,
+ self.server_out_port,
+ proto=IP_PROTOS.udp)
+ self.nat44_add_static_mapping(self.server.ip4, self.nat_addr)
+
+ self.reass_hairpinning(proto=IP_PROTOS.tcp)
+ self.reass_hairpinning(proto=IP_PROTOS.udp)
+ self.reass_hairpinning(proto=IP_PROTOS.icmp)
+
+ def test_dynamic(self):
+ """ NAT44 dynamic translation test """
+
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ nat_config = self.vapi.nat_show_config()
+ self.assertEqual(1, nat_config.endpoint_dependent)
+
+ # in2out
+ tcpn = self.statistics.get_err_counter(
+ '/err/nat44-ed-in2out-slowpath/TCP packets')
+ udpn = self.statistics.get_err_counter(
+ '/err/nat44-ed-in2out-slowpath/UDP packets')
+ icmpn = self.statistics.get_err_counter(
+ '/err/nat44-ed-in2out-slowpath/ICMP packets')
+ totaln = self.statistics.get_err_counter(
+ '/err/nat44-ed-in2out-slowpath/good in2out packets processed')
+
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-in2out-slowpath/TCP packets')
+ self.assertEqual(err - tcpn, 1)
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-in2out-slowpath/UDP packets')
+ self.assertEqual(err - udpn, 1)
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-in2out-slowpath/ICMP packets')
+ self.assertEqual(err - icmpn, 1)
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-in2out-slowpath/good in2out packets processed')
+ self.assertEqual(err - totaln, 3)
+
+ # out2in
+ tcpn = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in/TCP packets')
+ udpn = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in/UDP packets')
+ icmpn = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in-slowpath/ICMP packets')
+ totaln = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in/good out2in packets processed')
+
+ pkts = self.create_stream_out(self.pg1)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in/TCP packets')
+ self.assertEqual(err - tcpn, 1)
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in/UDP packets')
+ self.assertEqual(err - udpn, 1)
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in-slowpath/ICMP packets')
+ self.assertEqual(err - icmpn, 1)
+ err = self.statistics.get_err_counter(
+ '/err/nat44-ed-out2in/good out2in packets processed')
+ self.assertEqual(err - totaln, 2)
+
+ users = self.statistics.get_counter('/nat44/total-users')
+ self.assertEqual(users[0][0], 1)
+ sessions = self.statistics.get_counter('/nat44/total-sessions')
+ self.assertEqual(sessions[0][0], 3)
+
+ def test_forwarding(self):
+ """ NAT44 forwarding test """
+
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_forwarding_enable_disable(enable=1)
+
+ real_ip = self.pg0.remote_ip4
+ alias_ip = self.nat_addr
+ flags = self.config_flags.NAT_IS_ADDR_ONLY
+ self.vapi.nat44_add_del_static_mapping(is_add=1,
+ local_ip_address=real_ip,
+ external_ip_address=alias_ip,
+ external_sw_if_index=0xFFFFFFFF,
+ flags=flags)
+
+ try:
+ # in2out - static mapping match
+
+ pkts = self.create_stream_out(self.pg1)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, same_port=True)
+
+ # in2out - no static mapping match
+
+ host0 = self.pg0.remote_hosts[0]
+ self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
+ try:
+ pkts = self.create_stream_out(self.pg1,
+ dst_ip=self.pg0.remote_ip4,
+ use_inside_ports=True)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
+ same_port=True)
+ finally:
+ self.pg0.remote_hosts[0] = host0
+
+ user = self.pg0.remote_hosts[1]
+ sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
+ self.assertEqual(len(sessions), 3)
+ self.assertTrue(sessions[0].flags &
+ self.config_flags.NAT_IS_EXT_HOST_VALID)
+ self.vapi.nat44_del_session(
+ address=sessions[0].inside_ip_address,
+ port=sessions[0].inside_port,
+ protocol=sessions[0].protocol,
+ flags=(self.config_flags.NAT_IS_INSIDE |
+ self.config_flags.NAT_IS_EXT_HOST_VALID),
+ ext_host_address=sessions[0].ext_host_address,
+ ext_host_port=sessions[0].ext_host_port)
+ sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
+ self.assertEqual(len(sessions), 2)
+
+ finally:
+ self.vapi.nat44_forwarding_enable_disable(enable=0)
+ flags = self.config_flags.NAT_IS_ADDR_ONLY
+ self.vapi.nat44_add_del_static_mapping(
+ is_add=0,
+ local_ip_address=real_ip,
+ external_ip_address=alias_ip,
+ external_sw_if_index=0xFFFFFFFF,
+ flags=flags)
+
+ def test_static_lb(self):
+ """ NAT44 local service load balancing """
+ external_addr_n = self.nat_addr
+ external_port = 80
+ local_port = 8080
+ server1 = self.pg0.remote_hosts[0]
+ server2 = self.pg0.remote_hosts[1]
+
+ locals = [{'addr': server1.ip4n,
+ 'port': local_port,
+ 'probability': 70,
+ 'vrf_id': 0},
+ {'addr': server2.ip4n,
+ 'port': local_port,
+ 'probability': 30,
+ 'vrf_id': 0}]
+
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_add_del_lb_static_mapping(
+ is_add=1,
+ external_addr=external_addr_n,
+ external_port=external_port,
+ protocol=IP_PROTOS.tcp,
+ local_num=len(locals),
+ locals=locals)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ # from client to service
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=12345, dport=external_port))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ server = None
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertIn(ip.dst, [server1.ip4, server2.ip4])
+ if ip.dst == server1.ip4:
+ server = server1
+ else:
+ server = server2
+ self.assertEqual(tcp.dport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from service back to client
+ p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
+ IP(src=server.ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=local_port, dport=12345))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(tcp.sport, external_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
+ self.assertEqual(len(sessions), 1)
+ self.assertTrue(sessions[0].flags &
+ self.config_flags.NAT_IS_EXT_HOST_VALID)
+ self.vapi.nat44_del_session(
+ address=sessions[0].inside_ip_address,
+ port=sessions[0].inside_port,
+ protocol=sessions[0].protocol,
+ flags=(self.config_flags.NAT_IS_INSIDE |
+ self.config_flags.NAT_IS_EXT_HOST_VALID),
+ ext_host_address=sessions[0].ext_host_address,
+ ext_host_port=sessions[0].ext_host_port)
+ sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
+ self.assertEqual(len(sessions), 0)
+
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ def test_static_lb_multi_clients(self):
+ """ NAT44 local service load balancing - multiple clients"""
+
+ external_addr = self.nat_addr
+ external_port = 80
+ local_port = 8080
+ server1 = self.pg0.remote_hosts[0]
+ server2 = self.pg0.remote_hosts[1]
+ server3 = self.pg0.remote_hosts[2]
+
+ locals = [{'addr': server1.ip4n,
+ 'port': local_port,
+ 'probability': 90,
+ 'vrf_id': 0},
+ {'addr': server2.ip4n,
+ 'port': local_port,
+ 'probability': 10,
+ 'vrf_id': 0}]
+
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_add_del_lb_static_mapping(is_add=1,
+ external_addr=external_addr,
+ external_port=external_port,
+ protocol=IP_PROTOS.tcp,
+ local_num=len(locals),
+ locals=locals)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ server1_n = 0
+ server2_n = 0
+ clients = ip4_range(self.pg1.remote_ip4, 10, 50)
+ pkts = []
+ for client in clients:
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=client, dst=self.nat_addr) /
+ TCP(sport=12345, dport=external_port))
+ pkts.append(p)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for p in capture:
+ if p[IP].dst == server1.ip4:
+ server1_n += 1
+ else:
+ server2_n += 1
+ self.assertGreater(server1_n, server2_n)
+
+ local = {
+ 'addr': server3.ip4n,
+ 'port': local_port,
+ 'probability': 20,
+ 'vrf_id': 0
+ }
+
+ # add new back-end
+ self.vapi.nat44_lb_static_mapping_add_del_local(
+ is_add=1,
+ external_addr=external_addr,
+ external_port=external_port,
+ local=local,
+ protocol=IP_PROTOS.tcp)
+ server1_n = 0
+ server2_n = 0
+ server3_n = 0
+ clients = ip4_range(self.pg1.remote_ip4, 60, 110)
+ pkts = []
+ for client in clients:
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=client, dst=self.nat_addr) /
+ TCP(sport=12346, dport=external_port))
+ pkts.append(p)
+ self.assertGreater(len(pkts), 0)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for p in capture:
+ if p[IP].dst == server1.ip4:
+ server1_n += 1
+ elif p[IP].dst == server2.ip4:
+ server2_n += 1
+ else:
+ server3_n += 1
+ self.assertGreater(server1_n, 0)
+ self.assertGreater(server2_n, 0)
+ self.assertGreater(server3_n, 0)
+
+ local = {
+ 'addr': server2.ip4n,
+ 'port': local_port,
+ 'probability': 10,
+ 'vrf_id': 0
+ }
+
+ # remove one back-end
+ self.vapi.nat44_lb_static_mapping_add_del_local(
+ is_add=0,
+ external_addr=external_addr,
+ external_port=external_port,
+ local=local,
+ protocol=IP_PROTOS.tcp)
+ server1_n = 0
+ server2_n = 0
+ server3_n = 0
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for p in capture:
+ if p[IP].dst == server1.ip4:
+ server1_n += 1
+ elif p[IP].dst == server2.ip4:
+ server2_n += 1
+ else:
+ server3_n += 1
+ self.assertGreater(server1_n, 0)
+ self.assertEqual(server2_n, 0)
+ self.assertGreater(server3_n, 0)
+
+ def test_static_lb_2(self):
+ """ NAT44 local service load balancing (asymmetrical rule) """
+ external_addr = self.nat_addr
+ external_port = 80
+ local_port = 8080
+ server1 = self.pg0.remote_hosts[0]
+ server2 = self.pg0.remote_hosts[1]
+
+ locals = [{'addr': server1.ip4n,
+ 'port': local_port,
+ 'probability': 70,
+ 'vrf_id': 0},
+ {'addr': server2.ip4n,
+ 'port': local_port,
+ 'probability': 30,
+ 'vrf_id': 0}]
+
+ self.vapi.nat44_forwarding_enable_disable(enable=1)
+ flags = self.config_flags.NAT_IS_OUT2IN_ONLY
+ self.vapi.nat44_add_del_lb_static_mapping(is_add=1, flags=flags,
+ external_addr=external_addr,
+ external_port=external_port,
+ protocol=IP_PROTOS.tcp,
+ local_num=len(locals),
+ locals=locals)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ # from client to service
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=12345, dport=external_port))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ server = None
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertIn(ip.dst, [server1.ip4, server2.ip4])
+ if ip.dst == server1.ip4:
+ server = server1
+ else:
+ server = server2
+ self.assertEqual(tcp.dport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from service back to client
+ p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
+ IP(src=server.ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=local_port, dport=12345))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(tcp.sport, external_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from client to server (no translation)
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
+ TCP(sport=12346, dport=local_port))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ server = None
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, server1.ip4)
+ self.assertEqual(tcp.dport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from service back to client (no translation)
+ p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
+ IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=local_port, dport=12346))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, server1.ip4)
+ self.assertEqual(tcp.sport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ def test_lb_affinity(self):
+ """ NAT44 local service load balancing affinity """
+ external_addr = self.nat_addr
+ external_port = 80
+ local_port = 8080
+ server1 = self.pg0.remote_hosts[0]
+ server2 = self.pg0.remote_hosts[1]
+
+ locals = [{'addr': server1.ip4n,
+ 'port': local_port,
+ 'probability': 50,
+ 'vrf_id': 0},
+ {'addr': server2.ip4n,
+ 'port': local_port,
+ 'probability': 50,
+ 'vrf_id': 0}]
+
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_add_del_lb_static_mapping(is_add=1,
+ external_addr=external_addr,
+ external_port=external_port,
+ protocol=IP_PROTOS.tcp,
+ affinity=10800,
+ local_num=len(locals),
+ locals=locals)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=1025, dport=external_port))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ backend = capture[0][IP].dst
+
+ sessions = self.vapi.nat44_user_session_dump(backend, 0)
+ self.assertEqual(len(sessions), 1)
+ self.assertTrue(sessions[0].flags &
+ self.config_flags.NAT_IS_EXT_HOST_VALID)
+ self.vapi.nat44_del_session(
+ address=sessions[0].inside_ip_address,
+ port=sessions[0].inside_port,
+ protocol=sessions[0].protocol,
+ flags=(self.config_flags.NAT_IS_INSIDE |
+ self.config_flags.NAT_IS_EXT_HOST_VALID),
+ ext_host_address=sessions[0].ext_host_address,
+ ext_host_port=sessions[0].ext_host_port)
+
+ pkts = []
+ for port in range(1030, 1100):
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=port, dport=external_port))
+ pkts.append(p)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for p in capture:
+ self.assertEqual(p[IP].dst, backend)
+
+ def test_unknown_proto(self):
+ """ NAT44 translate packet with unknown protocol """
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ # in2out
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=20))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ p = self.pg1.get_capture(1)
+
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ GRE() /
+ IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
+ TCP(sport=1234, dport=1234))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ p = self.pg1.get_capture(1)
+ packet = p[0]
+ try:
+ self.assertEqual(packet[IP].src, self.nat_addr)
+ self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
+ self.assertEqual(packet.haslayer(GRE), 1)
+ self.assert_packet_checksums_valid(packet)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # out2in
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ GRE() /
+ IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
+ TCP(sport=1234, dport=1234))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ p = self.pg0.get_capture(1)
+ packet = p[0]
+ try:
+ self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
+ self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
+ self.assertEqual(packet.haslayer(GRE), 1)
+ self.assert_packet_checksums_valid(packet)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ def test_hairpinning_unknown_proto(self):
+ """ NAT44 translate packet with unknown protocol - hairpinning """
+ host = self.pg0.remote_hosts[0]
+ server = self.pg0.remote_hosts[1]
+ host_in_port = 1234
+ server_out_port = 8765
+ server_nat_ip = "10.0.0.11"
+
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ # add static mapping for server
+ self.nat44_add_static_mapping(server.ip4, server_nat_ip)
+
+ # host to server
+ p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
+ IP(src=host.ip4, dst=server_nat_ip) /
+ TCP(sport=host_in_port, dport=server_out_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
+
+ p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
+ IP(src=host.ip4, dst=server_nat_ip) /
+ GRE() /
+ IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
+ TCP(sport=1234, dport=1234))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ p = self.pg0.get_capture(1)
+ packet = p[0]
+ try:
+ self.assertEqual(packet[IP].src, self.nat_addr)
+ self.assertEqual(packet[IP].dst, server.ip4)
+ self.assertEqual(packet.haslayer(GRE), 1)
+ self.assert_packet_checksums_valid(packet)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # server to host
+ p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
+ IP(src=server.ip4, dst=self.nat_addr) /
+ GRE() /
+ IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
+ TCP(sport=1234, dport=1234))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ p = self.pg0.get_capture(1)
+ packet = p[0]
+ try:
+ self.assertEqual(packet[IP].src, server_nat_ip)
+ self.assertEqual(packet[IP].dst, host.ip4)
+ self.assertEqual(packet.haslayer(GRE), 1)
+ self.assert_packet_checksums_valid(packet)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ def test_output_feature_and_service(self):
+ """ NAT44 interface output feature and services """
+ external_addr = '1.2.3.4'
+ external_port = 80
+ local_port = 8080
+
+ self.vapi.nat44_forwarding_enable_disable(enable=1)
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_ADDR_ONLY
+ self.vapi.nat44_add_del_identity_mapping(
+ ip_address=self.pg1.remote_ip4n, sw_if_index=0xFFFFFFFF,
+ flags=flags, is_add=1)
+ flags = self.config_flags.NAT_IS_OUT2IN_ONLY
+ self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
+ local_port, external_port,
+ proto=IP_PROTOS.tcp, flags=flags)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_output_feature(
+ is_add=1,
+ sw_if_index=self.pg1.sw_if_index)
+
+ # from client to service
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=external_addr) /
+ TCP(sport=12345, dport=external_port))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg0.remote_ip4)
+ self.assertEqual(tcp.dport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from service back to client
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=local_port, dport=12345))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, external_addr)
+ self.assertEqual(tcp.sport, external_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from local network host to external network
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+
+ # from external network back to local network host
+ pkts = self.create_stream_out(self.pg1)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ def test_output_feature_and_service2(self):
+ """ NAT44 interface output feature and service host direct access """
+ self.vapi.nat44_forwarding_enable_disable(enable=1)
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_output_feature(
+ is_add=1,
+ sw_if_index=self.pg1.sw_if_index)
+
+ # session initiated from service host - translate
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+
+ pkts = self.create_stream_out(self.pg1)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ # session initiated from remote host - do not translate
+ self.tcp_port_in = 60303
+ self.udp_port_in = 60304
+ self.icmp_id_in = 60305
+ pkts = self.create_stream_out(self.pg1,
+ self.pg0.remote_ip4,
+ use_inside_ports=True)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
+ same_port=True)
+
+ def test_output_feature_and_service3(self):
+ """ NAT44 interface output feature and DST NAT """
+ external_addr = '1.2.3.4'
+ external_port = 80
+ local_port = 8080
+
+ self.vapi.nat44_forwarding_enable_disable(enable=1)
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_OUT2IN_ONLY
+ self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
+ local_port, external_port,
+ proto=IP_PROTOS.tcp, flags=flags)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_output_feature(
+ is_add=1,
+ sw_if_index=self.pg1.sw_if_index)
+
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=external_addr) /
+ TCP(sport=12345, dport=external_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.pg0.remote_ip4)
+ self.assertEqual(tcp.sport, 12345)
+ self.assertEqual(ip.dst, self.pg1.remote_ip4)
+ self.assertEqual(tcp.dport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
+ TCP(sport=local_port, dport=12345))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, external_addr)
+ self.assertEqual(tcp.sport, external_port)
+ self.assertEqual(ip.dst, self.pg0.remote_ip4)
+ self.assertEqual(tcp.dport, 12345)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ def test_next_src_nat(self):
+ """ On way back forward packet to nat44-in2out node. """
+ twice_nat_addr = '10.0.1.3'
+ external_port = 80
+ local_port = 8080
+ post_twice_nat_port = 0
+
+ self.vapi.nat44_forwarding_enable_disable(enable=1)
+ self.nat44_add_address(twice_nat_addr, twice_nat=1)
+ flags = (self.config_flags.NAT_IS_OUT2IN_ONLY |
+ self.config_flags.NAT_IS_SELF_TWICE_NAT)
+ self.nat44_add_static_mapping(self.pg6.remote_ip4, self.pg1.remote_ip4,
+ local_port, external_port,
+ proto=IP_PROTOS.tcp, vrf_id=1,
+ flags=flags)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg6.sw_if_index,
+ is_add=1)
+
+ p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
+ IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=12345, dport=external_port))
+ self.pg6.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg6.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, twice_nat_addr)
+ self.assertNotEqual(tcp.sport, 12345)
+ post_twice_nat_port = tcp.sport
+ self.assertEqual(ip.dst, self.pg6.remote_ip4)
+ self.assertEqual(tcp.dport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
+ IP(src=self.pg6.remote_ip4, dst=twice_nat_addr) /
+ TCP(sport=local_port, dport=post_twice_nat_port))
+ self.pg6.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg6.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.pg1.remote_ip4)
+ self.assertEqual(tcp.sport, external_port)
+ self.assertEqual(ip.dst, self.pg6.remote_ip4)
+ self.assertEqual(tcp.dport, 12345)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
+ client_id=None):
+ twice_nat_addr = '10.0.1.3'
+
+ port_in = 8080
+ if lb:
+ if not same_pg:
+ port_in1 = port_in
+ port_in2 = port_in
+ else:
+ port_in1 = port_in + 1
+ port_in2 = port_in + 2
+
+ port_out = 80
+ eh_port_out = 4567
+
+ server1 = self.pg0.remote_hosts[0]
+ server2 = self.pg0.remote_hosts[1]
+ if lb and same_pg:
+ server2 = server1
+ if not lb:
+ server = server1
+
+ pg0 = self.pg0
+ if same_pg:
+ pg1 = self.pg0
+ else:
+ pg1 = self.pg1
+
+ eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
+ client_id == 1)
+
+ self.nat44_add_address(self.nat_addr)
+ self.nat44_add_address(twice_nat_addr, twice_nat=1)
+
+ flags = 0
+ if self_twice_nat:
+ flags |= self.config_flags.NAT_IS_SELF_TWICE_NAT
+ else:
+ flags |= self.config_flags.NAT_IS_TWICE_NAT
+
+ if not lb:
+ self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
+ port_in, port_out,
+ proto=IP_PROTOS.tcp,
+ flags=flags)
+ else:
+ locals = [{'addr': server1.ip4n,
+ 'port': port_in1,
+ 'probability': 50,
+ 'vrf_id': 0},
+ {'addr': server2.ip4n,
+ 'port': port_in2,
+ 'probability': 50,
+ 'vrf_id': 0}]
+ out_addr = self.nat_addr
+
+ self.vapi.nat44_add_del_lb_static_mapping(is_add=1, flags=flags,
+ external_addr=out_addr,
+ external_port=port_out,
+ protocol=IP_PROTOS.tcp,
+ local_num=len(locals),
+ locals=locals)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=pg1.sw_if_index,
+ is_add=1)
+
+ if same_pg:
+ if not lb:
+ client = server
+ else:
+ assert client_id is not None
+ if client_id == 1:
+ client = self.pg0.remote_hosts[0]
+ elif client_id == 2:
+ client = self.pg0.remote_hosts[1]
+ else:
+ client = pg1.remote_hosts[0]
+ p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
+ IP(src=client.ip4, dst=self.nat_addr) /
+ TCP(sport=eh_port_out, dport=port_out))
+ pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ if lb:
+ if ip.dst == server1.ip4:
+ server = server1
+ port_in = port_in1
+ else:
+ server = server2
+ port_in = port_in2
+ self.assertEqual(ip.dst, server.ip4)
+ if lb and same_pg:
+ self.assertIn(tcp.dport, [port_in1, port_in2])
+ else:
+ self.assertEqual(tcp.dport, port_in)
+ if eh_translate:
+ self.assertEqual(ip.src, twice_nat_addr)
+ self.assertNotEqual(tcp.sport, eh_port_out)
+ else:
+ self.assertEqual(ip.src, client.ip4)
+ self.assertEqual(tcp.sport, eh_port_out)
+ eh_addr_in = ip.src
+ eh_port_in = tcp.sport
+ saved_port_in = tcp.dport
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ p = (Ether(src=server.mac, dst=pg0.local_mac) /
+ IP(src=server.ip4, dst=eh_addr_in) /
+ TCP(sport=saved_port_in, dport=eh_port_in))
+ pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, client.ip4)
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(tcp.dport, eh_port_out)
+ self.assertEqual(tcp.sport, port_out)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ if eh_translate:
+ sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
+ self.assertEqual(len(sessions), 1)
+ self.assertTrue(sessions[0].flags &
+ self.config_flags.NAT_IS_EXT_HOST_VALID)
+ self.assertTrue(sessions[0].flags &
+ self.config_flags.NAT_IS_TWICE_NAT)
+ self.logger.error(self.vapi.cli("show nat44 sessions detail"))
+ self.vapi.nat44_del_session(
+ address=sessions[0].inside_ip_address,
+ port=sessions[0].inside_port,
+ protocol=sessions[0].protocol,
+ flags=(self.config_flags.NAT_IS_INSIDE |
+ self.config_flags.NAT_IS_EXT_HOST_VALID),
+ ext_host_address=sessions[0].ext_host_nat_address,
+ ext_host_port=sessions[0].ext_host_nat_port)
+ sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
+ self.assertEqual(len(sessions), 0)
+
+ def test_twice_nat(self):
+ """ Twice NAT44 """
+ self.twice_nat_common()
+
+ def test_self_twice_nat_positive(self):
+ """ Self Twice NAT44 (positive test) """
+ self.twice_nat_common(self_twice_nat=True, same_pg=True)
+
+ def test_self_twice_nat_negative(self):
+ """ Self Twice NAT44 (negative test) """
+ self.twice_nat_common(self_twice_nat=True)
+
+ def test_twice_nat_lb(self):
+ """ Twice NAT44 local service load balancing """
+ self.twice_nat_common(lb=True)
+
+ def test_self_twice_nat_lb_positive(self):
+ """ Self Twice NAT44 local service load balancing (positive test) """
+ self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
+ client_id=1)
+
+ def test_self_twice_nat_lb_negative(self):
+ """ Self Twice NAT44 local service load balancing (negative test) """
+ self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
+ client_id=2)
+
+ def test_twice_nat_interface_addr(self):
+ """ Acquire twice NAT44 addresses from interface """
+ flags = self.config_flags.NAT_IS_TWICE_NAT
+ self.vapi.nat44_add_del_interface_addr(
+ is_add=1,
+ sw_if_index=self.pg3.sw_if_index,
+ flags=flags)
+
+ # no address in NAT pool
+ adresses = self.vapi.nat44_address_dump()
+ self.assertEqual(0, len(adresses))
+
+ # configure interface address and check NAT address pool
+ self.pg3.config_ip4()
+ adresses = self.vapi.nat44_address_dump()
+ self.assertEqual(1, len(adresses))
+ self.assertEqual(str(adresses[0].ip_address),
+ self.pg3.local_ip4)
+ self.assertEqual(adresses[0].flags, flags)
+
+ # remove interface address and check NAT address pool
+ self.pg3.unconfig_ip4()
+ adresses = self.vapi.nat44_address_dump()
+ self.assertEqual(0, len(adresses))
+
+ def test_tcp_close(self):
+ """ Close TCP session from inside network - output feature """
+ self.vapi.nat44_forwarding_enable_disable(enable=1)
+ self.nat44_add_address(self.pg1.local_ip4)
+ twice_nat_addr = '10.0.1.3'
+ service_ip = '192.168.16.150'
+ self.nat44_add_address(twice_nat_addr, twice_nat=1)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_output_feature(
+ is_add=1,
+ sw_if_index=self.pg1.sw_if_index)
+ flags = (self.config_flags.NAT_IS_OUT2IN_ONLY |
+ self.config_flags.NAT_IS_TWICE_NAT)
+ self.nat44_add_static_mapping(self.pg0.remote_ip4,
+ service_ip,
+ 80,
+ 80,
+ proto=IP_PROTOS.tcp,
+ flags=flags)
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ start_sessnum = len(sessions)
+
+ # SYN packet out->in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=service_ip) /
+ TCP(sport=33898, dport=80, flags="S"))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ tcp_port = p[TCP].sport
+
+ # SYN + ACK packet in->out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
+ TCP(sport=80, dport=tcp_port, flags="SA"))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ # ACK packet out->in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=service_ip) /
+ TCP(sport=33898, dport=80, flags="A"))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
+
+ # FIN packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
+ TCP(sport=80, dport=tcp_port, flags="FA", seq=100, ack=300))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ # FIN+ACK packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=service_ip) /
+ TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
+
+ # ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
+ TCP(sport=80, dport=tcp_port, flags="A", seq=101, ack=301))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
+ 0)
+ self.assertEqual(len(sessions) - start_sessnum, 0)
+
+ def test_tcp_session_close_in(self):
+ """ Close TCP session from inside network """
+ self.tcp_port_out = 10505
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_TWICE_NAT
+ self.nat44_add_static_mapping(self.pg0.remote_ip4,
+ self.nat_addr,
+ self.tcp_port_in,
+ self.tcp_port_out,
+ proto=IP_PROTOS.tcp,
+ flags=flags)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ start_sessnum = len(sessions)
+
+ self.initiate_tcp_session(self.pg0, self.pg1)
+
+ # FIN packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="FA", seq=100, ack=300))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ pkts = []
+
+ # ACK packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="A", seq=300, ack=101))
+ pkts.append(p)
+
+ # FIN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="FA", seq=300, ack=101))
+ pkts.append(p)
+
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(2)
+
+ # ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="A", seq=101, ack=301))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
+ 0)
+ self.assertEqual(len(sessions) - start_sessnum, 0)
+
+ def test_tcp_session_close_out(self):
+ """ Close TCP session from outside network """
+ self.tcp_port_out = 10505
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_TWICE_NAT
+ self.nat44_add_static_mapping(self.pg0.remote_ip4,
+ self.nat_addr,
+ self.tcp_port_in,
+ self.tcp_port_out,
+ proto=IP_PROTOS.tcp,
+ flags=flags)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ start_sessnum = len(sessions)
+
+ self.initiate_tcp_session(self.pg0, self.pg1)
+
+ # FIN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="FA", seq=100, ack=300))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
+
+ # FIN+ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="FA", seq=300, ack=101))
+
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ # ACK packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="A", seq=101, ack=301))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
+ 0)
+ self.assertEqual(len(sessions) - start_sessnum, 0)
+
+ def test_tcp_session_close_simultaneous(self):
+ """ Close TCP session from inside network """
+ self.tcp_port_out = 10505
+ self.nat44_add_address(self.nat_addr)
+ flags = self.config_flags.NAT_IS_TWICE_NAT
+ self.nat44_add_static_mapping(self.pg0.remote_ip4,
+ self.nat_addr,
+ self.tcp_port_in,
+ self.tcp_port_out,
+ proto=IP_PROTOS.tcp,
+ flags=flags)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ start_sessnum = len(sessions)
+
+ self.initiate_tcp_session(self.pg0, self.pg1)
+
+ # FIN packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="FA", seq=100, ack=300))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ # FIN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="FA", seq=300, ack=100))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
+
+ # ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="A", seq=101, ack=301))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ # ACK packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="A", seq=301, ack=101))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
+ 0)
+ self.assertEqual(len(sessions) - start_sessnum, 0)
+
+ def test_one_armed_nat44_static(self):
+ """ One armed NAT44 and 1:1 NAPT asymmetrical rule """
+ remote_host = self.pg4.remote_hosts[0]
+ local_host = self.pg4.remote_hosts[1]
+ external_port = 80
+ local_port = 8080
+ eh_port_in = 0
+
+ self.vapi.nat44_forwarding_enable_disable(enable=1)
+ self.nat44_add_address(self.nat_addr, twice_nat=1)
+ flags = (self.config_flags.NAT_IS_OUT2IN_ONLY |
+ self.config_flags.NAT_IS_TWICE_NAT)
+ self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
+ local_port, external_port,
+ proto=IP_PROTOS.tcp, flags=flags)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg4.sw_if_index,
+ is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg4.sw_if_index,
+ flags=flags, is_add=1)
+
+ # from client to service
+ p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
+ IP(src=remote_host.ip4, dst=self.nat_addr) /
+ TCP(sport=12345, dport=external_port))
+ self.pg4.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg4.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, local_host.ip4)
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(tcp.dport, local_port)
+ self.assertNotEqual(tcp.sport, 12345)
+ eh_port_in = tcp.sport
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from service back to client
+ p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
+ IP(src=local_host.ip4, dst=self.nat_addr) /
+ TCP(sport=local_port, dport=eh_port_in))
+ self.pg4.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg4.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(ip.dst, remote_host.ip4)
+ self.assertEqual(tcp.sport, external_port)
+ self.assertEqual(tcp.dport, 12345)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ def test_static_with_port_out2(self):
+ """ 1:1 NAPT asymmetrical rule """
+
+ external_port = 80
+ local_port = 8080
+
+ self.vapi.nat44_forwarding_enable_disable(enable=1)
+ flags = self.config_flags.NAT_IS_OUT2IN_ONLY
+ self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
+ local_port, external_port,
+ proto=IP_PROTOS.tcp, flags=flags)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ # from client to service
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=12345, dport=external_port))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.dst, self.pg0.remote_ip4)
+ self.assertEqual(tcp.dport, local_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # ICMP error
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ ICMP(type=11) / capture[0][IP])
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ self.assertEqual(p[IP].src, self.nat_addr)
+ inner = p[IPerror]
+ self.assertEqual(inner.dst, self.nat_addr)
+ self.assertEqual(inner[TCPerror].dport, external_port)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from service back to client
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=local_port, dport=12345))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(tcp.sport, external_port)
+ self.assert_packet_checksums_valid(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # ICMP error
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ ICMP(type=11) / capture[0][IP])
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
+ inner = p[IPerror]
+ self.assertEqual(inner.src, self.pg0.remote_ip4)
+ self.assertEqual(inner[TCPerror].sport, local_port)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # from client to server (no translation)
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /