import unittest
from io import BytesIO
-from random import randint, shuffle, choice
+from random import randint, choice
import scapy.compat
from framework import VppTestCase, VppTestRunner
from vpp_acl import AclRule, VppAcl, VppAclInterface
from vpp_ip_route import VppIpRoute, VppRoutePath
from vpp_papi import VppEnum
+from util import StatsDiff
-class NAT44EDTestCase(VppTestCase):
+class TestNAT44ED(VppTestCase):
+ """ NAT44ED Test Case """
- nat_addr = '10.0.0.3'
+ nat_addr = '10.0.10.3'
tcp_port_in = 6303
tcp_port_out = 6303
max_sessions = 100
def setUp(self):
- super(NAT44EDTestCase, self).setUp()
+ super().setUp()
self.plugin_enable()
def tearDown(self):
- super(NAT44EDTestCase, self).tearDown()
+ super().tearDown()
if not self.vpp_dead:
self.plugin_disable()
@classmethod
def setUpClass(cls):
- super(NAT44EDTestCase, cls).setUpClass()
+ super().setUpClass()
cls.create_pg_interfaces(range(12))
cls.interfaces = list(cls.pg_interfaces[:4])
for r in rl:
r.add_vpp_config()
+ cls.no_diff = StatsDiff({
+ pg.sw_if_index: {
+ '/nat44-ed/in2out/fastpath/tcp': 0,
+ '/nat44-ed/in2out/fastpath/udp': 0,
+ '/nat44-ed/in2out/fastpath/icmp': 0,
+ '/nat44-ed/in2out/fastpath/drops': 0,
+ '/nat44-ed/in2out/slowpath/tcp': 0,
+ '/nat44-ed/in2out/slowpath/udp': 0,
+ '/nat44-ed/in2out/slowpath/icmp': 0,
+ '/nat44-ed/in2out/slowpath/drops': 0,
+ '/nat44-ed/in2out/fastpath/tcp': 0,
+ '/nat44-ed/in2out/fastpath/udp': 0,
+ '/nat44-ed/in2out/fastpath/icmp': 0,
+ '/nat44-ed/in2out/fastpath/drops': 0,
+ '/nat44-ed/in2out/slowpath/tcp': 0,
+ '/nat44-ed/in2out/slowpath/udp': 0,
+ '/nat44-ed/in2out/slowpath/icmp': 0,
+ '/nat44-ed/in2out/slowpath/drops': 0,
+ }
+ for pg in cls.pg_interfaces
+ })
+
def get_err_counter(self, path):
return self.statistics.get_err_counter(path)
sessions = self.vapi.nat44_user_session_dump(server.ip4, 0)
self.assertEqual(len(sessions), 0)
- def verify_syslog_sess(self, data, is_add=True, is_ip6=False):
+ def verify_syslog_sess(self, data, msgid, is_ip6=False):
message = data.decode('utf-8')
try:
message = SyslogMessage.parse(message)
else:
self.assertEqual(message.severity, SyslogSeverity.info)
self.assertEqual(message.appname, 'NAT')
- self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL')
+ self.assertEqual(message.msgid, msgid)
sd_params = message.sd.get('nsess')
self.assertTrue(sd_params is not None)
if is_ip6:
self.assertEqual(sd_params.get('XDPORT'),
"%d" % self.tcp_external_port)
-
-class TestNAT44ED(NAT44EDTestCase):
- """ NAT44ED Test Case """
-
def test_icmp_error(self):
""" NAT44ED test ICMP error message with inner header"""
vpp_worker_count = 4
max_sessions = 5000
- @unittest.skip('MW fix required')
- def test_users_dump(self):
- """ NAT44ED API test - nat44_user_dump """
-
- @unittest.skip('MW fix required')
- def test_frag_out_of_order_do_not_translate(self):
- """ NAT44ED don't translate fragments arriving out of order """
-
- @unittest.skip('MW fix required')
- def test_forwarding(self):
- """ NAT44ED forwarding test """
-
- @unittest.skip('MW fix required')
- def test_twice_nat(self):
- """ NAT44ED Twice NAT """
-
- @unittest.skip('MW fix required')
- def test_twice_nat_lb(self):
- """ NAT44ED Twice NAT local service load balancing """
-
- @unittest.skip('MW fix required')
- def test_output_feature(self):
- """ NAT44ED interface output feature (in2out postrouting) """
-
- @unittest.skip('MW fix required')
- def test_static_with_port_out2(self):
- """ NAT44ED 1:1 NAPT asymmetrical rule """
-
- @unittest.skip('MW fix required')
- def test_output_feature_and_service2(self):
- """ NAT44ED interface output feature and service host direct access """
-
- @unittest.skip('MW fix required')
- def test_static_lb(self):
- """ NAT44ED local service load balancing """
-
- @unittest.skip('MW fix required')
- def test_static_lb_2(self):
- """ NAT44ED local service load balancing (asymmetrical rule) """
-
- @unittest.skip('MW fix required')
- def test_lb_affinity(self):
- """ NAT44ED local service load balancing affinity """
-
- @unittest.skip('MW fix required')
- def test_multiple_vrf(self):
- """ NAT44ED Multiple VRF setup """
-
- @unittest.skip('MW fix required')
- def test_self_twice_nat_positive(self):
- """ NAT44ED Self Twice NAT (positive test) """
-
- @unittest.skip('MW fix required')
- def test_self_twice_nat_lb_positive(self):
- """ NAT44ED Self Twice NAT local service load balancing (positive test)
- """
-
def test_dynamic(self):
""" NAT44ED dynamic translation test """
pkt_count = 1500
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture(pkt_count * 3)
+ capture = self.pg1.get_capture(pkt_count * 3, timeout=5)
if_idx = self.pg0.sw_if_index
tc2 = self.statistics['/nat44-ed/in2out/slowpath/tcp']
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.pg1.get_capture(len(pkts))
- self.sleep(1.5, "wait for timeouts")
+ self.virtual_sleep(1.5, "wait for timeouts")
pkts = []
for i in range(0, self.max_sessions - 1):
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
flags="R"))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
+ self.send_and_expect(self.pg0, p, self.pg1)
+
+ self.virtual_sleep(6)
- self.sleep(6)
+ # The session is already closed
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="P"))
+ self.send_and_assert_no_replies(self.pg0, p, self.pg1)
+ # The session can be re-opened
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in + 1, dport=self.tcp_external_port + 1,
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
flags="S"))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
+ self.send_and_expect(self.pg0, p, self.pg1)
- def test_dynamic_out_of_ports(self):
- """ NAT44ED dynamic translation test: out of ports """
+ def test_session_rst_established_timeout(self):
+ """ NAT44ED session RST timeouts """
+ self.nat_add_address(self.nat_addr)
self.nat_add_inside_interface(self.pg0)
self.nat_add_outside_interface(self.pg1)
- # in2out and no NAT addresses added
- err_old = self.statistics.get_err_counter(
- '/err/nat44-ed-in2out-slowpath/out of ports')
+ self.vapi.nat_set_timeouts(udp=300, tcp_established=7440,
+ tcp_transitory=5, icmp=60)
- pkts = self.create_stream_in(self.pg0, self.pg1)
- self.pg0.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(0, timeout=1)
+ self.init_tcp_session(self.pg0, self.pg1, self.tcp_port_in,
+ self.tcp_external_port)
- err_new = self.statistics.get_err_counter(
- '/err/nat44-ed-in2out-slowpath/out of ports')
+ # Wait at least the transitory time, the session is in established
+ # state anyway. RST followed by a data packet should keep it
+ # established.
+ self.virtual_sleep(6)
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="R"))
+ self.send_and_expect(self.pg0, p, self.pg1)
- self.assertEqual(err_new - err_old, len(pkts))
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="P"))
+ self.send_and_expect(self.pg0, p, self.pg1)
- # in2out after NAT addresses added
- self.nat_add_address(self.nat_addr)
+ # State is established, session should be still open after 6 seconds
+ self.virtual_sleep(6)
- err_old = self.statistics.get_err_counter(
- '/err/nat44-ed-in2out-slowpath/out of ports')
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="R"))
+ self.send_and_expect(self.pg0, p, self.pg1)
+
+ # State is transitory, session should be closed after 6 seconds
+ self.virtual_sleep(6)
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="P"))
+ self.send_and_assert_no_replies(self.pg0, p, self.pg1)
+
+ def test_dynamic_out_of_ports(self):
+ """ NAT44ED dynamic translation test: out of ports """
+
+ self.nat_add_inside_interface(self.pg0)
+ self.nat_add_outside_interface(self.pg1)
+
+ # in2out and no NAT addresses added
pkts = self.create_stream_in(self.pg0, self.pg1)
- self.pg0.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- capture = self.pg1.get_capture(len(pkts))
- self.verify_capture_out(capture, ignore_port=True)
- err_new = self.statistics.get_err_counter(
- '/err/nat44-ed-in2out-slowpath/out of ports')
+ self.send_and_assert_no_replies(
+ self.pg0, pkts, msg="i2o pkts",
+ stats_diff=self.no_diff | {
+ "err": {
+ '/err/nat44-ed-in2out-slowpath/out of ports': len(pkts),
+ },
+ self.pg0.sw_if_index: {
+ '/nat44-ed/in2out/slowpath/drops': len(pkts),
+ },
+ }
+ )
- self.assertEqual(err_new, err_old)
+ # in2out after NAT addresses added
+ self.nat_add_address(self.nat_addr)
+
+ tcpn, udpn, icmpn = (sum(x) for x in
+ zip(*((TCP in p, UDP in p, ICMP in p)
+ for p in pkts)))
+
+ self.send_and_expect(
+ self.pg0, pkts, self.pg1, msg="i2o pkts",
+ stats_diff=self.no_diff | {
+ "err": {
+ '/err/nat44-ed-in2out-slowpath/out of ports': 0,
+ },
+ self.pg0.sw_if_index: {
+ '/nat44-ed/in2out/slowpath/drops': 0,
+ '/nat44-ed/in2out/slowpath/tcp': tcpn,
+ '/nat44-ed/in2out/slowpath/udp': udpn,
+ '/nat44-ed/in2out/slowpath/icmp': icmpn,
+ },
+ }
+ )
def test_unknown_proto(self):
""" NAT44ED translate packet with unknown protocol """
self.nat_add_inside_interface(self.pg0)
self.nat_add_outside_interface(self.pg0)
- self.vapi.nat44_interface_add_del_output_feature(
+ self.vapi.nat44_ed_add_del_output_interface(
sw_if_index=self.pg1.sw_if_index, is_add=1)
# from client to service
self.nat_add_inside_interface(self.pg0)
self.nat_add_outside_interface(self.pg0)
- self.vapi.nat44_interface_add_del_output_feature(
+ self.vapi.nat44_ed_add_del_output_interface(
sw_if_index=self.pg1.sw_if_index, is_add=1)
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
self.assertEqual(server2_n, 0)
self.assertGreater(server3_n, 0)
- def test_syslog_sess(self):
+ # put zzz in front of syslog test name so that it runs as a last test
+ # setting syslog sender cannot be undone and if it is set, it messes
+ # with self.send_and_assert_no_replies functionality
+ def test_zzz_syslog_sess(self):
""" NAT44ED Test syslog session creation and deletion """
self.vapi.syslog_set_filter(
self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
capture = self.pg1.get_capture(1)
self.tcp_port_out = capture[0][TCP].sport
capture = self.pg3.get_capture(1)
- self.verify_syslog_sess(capture[0][Raw].load)
+ self.verify_syslog_sess(capture[0][Raw].load, 'SADD')
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.nat_add_address(self.nat_addr, is_add=0)
capture = self.pg3.get_capture(1)
- self.verify_syslog_sess(capture[0][Raw].load, False)
+ self.verify_syslog_sess(capture[0][Raw].load, 'SDEL')
+
+ # put zzz in front of syslog test name so that it runs as a last test
+ # setting syslog sender cannot be undone and if it is set, it messes
+ # with self.send_and_assert_no_replies functionality
+ def test_zzz_syslog_sess_reopen(self):
+ """ Syslog events for session reopen """
+ self.vapi.syslog_set_filter(
+ self.syslog_severity.SYSLOG_API_SEVERITY_INFO)
+ self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
+
+ self.nat_add_address(self.nat_addr)
+ self.nat_add_inside_interface(self.pg0)
+ self.nat_add_outside_interface(self.pg1)
+
+ # SYN in2out
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
+ capture = self.send_and_expect(self.pg0, p, self.pg1)[0]
+ self.tcp_port_out = capture[0][TCP].sport
+ capture = self.pg3.get_capture(1)
+ self.verify_syslog_sess(capture[0][Raw].load, 'SADD')
+
+ # SYN out2in
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out))
+ self.send_and_expect(self.pg1, p, self.pg0)
+
+ # FIN in2out
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="F"))
+ self.send_and_expect(self.pg0, p, self.pg1)
+
+ # FIN out2in
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="F"))
+ self.send_and_expect(self.pg1, p, self.pg0)
+
+ # SYN in2out
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
+ self.send_and_expect(self.pg0, p, self.pg1)
+
+ # SYN out2in
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out))
+ self.send_and_expect(self.pg1, p, self.pg0)
+
+ # 2 records should be produced - first one del & add
+ capture = self.pg3.get_capture(2)
+ self.verify_syslog_sess(capture[0][Raw].load, 'SDEL')
+ self.verify_syslog_sess(capture[1][Raw].load, 'SADD')
def test_twice_nat_interface_addr(self):
""" NAT44ED Acquire twice NAT addresses from interface """
""" NAT44ED output feature works with stateful ACL """
self.nat_add_address(self.nat_addr)
- self.vapi.nat44_interface_add_del_output_feature(
+ self.vapi.nat44_ed_add_del_output_interface(
sw_if_index=self.pg1.sw_if_index, is_add=1)
# First ensure that the NAT is working sans ACL
self.vapi.nat44_interface_add_del_feature(
sw_if_index=self.pg0.sw_if_index,
flags=flags, is_add=1)
- self.vapi.nat44_interface_add_del_output_feature(
+ self.vapi.nat44_ed_add_del_output_interface(
is_add=1,
sw_if_index=self.pg1.sw_if_index)
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=service_ip) /
TCP(sport=33898, dport=80, flags="S"))
- self.pg1.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- capture = self.pg0.get_capture(1)
+ capture = self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
p = capture[0]
tcp_port = p[TCP].sport
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
TCP(sport=80, dport=tcp_port, flags="SA"))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
+ self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
# ACK packet out->in
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=service_ip) /
TCP(sport=33898, dport=80, flags="A"))
- self.pg1.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg0.get_capture(1)
+ self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
# FIN packet in -> out
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
TCP(sport=80, dport=tcp_port, flags="FA", seq=100, ack=300))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
+ self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
# FIN+ACK packet out -> in
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=service_ip) /
TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101))
- self.pg1.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg0.get_capture(1)
+ self.send_and_expect(self.pg1, p, self.pg0, n_rx=1)
# ACK packet in -> out
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
TCP(sport=80, dport=tcp_port, flags="A", seq=101, ack=301))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
+ self.send_and_expect(self.pg0, p, self.pg1, n_rx=1)
- # session now in transitory timeout
- # try SYN packet out->in - should be dropped
+ # session now in transitory timeout, but traffic still flows
+ # try FIN packet out->in
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=service_ip) /
- TCP(sport=33898, dport=80, flags="S"))
+ TCP(sport=33898, dport=80, flags="F"))
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- self.sleep(new_transitory, "wait for transitory timeout")
- self.pg0.assert_nothing_captured(0)
+ self.virtual_sleep(new_transitory, "wait for transitory timeout")
+ self.pg0.get_capture(1)
# session should still exist
sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=service_ip) /
TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101))
- self.pg1.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
+ self.send_and_assert_no_replies(self.pg1, p)
sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
self.assertEqual(len(sessions) - start_sessnum, 0)
- self.pg0.assert_nothing_captured(0)
def test_tcp_session_close_in(self):
""" NAT44ED Close TCP session from inside network """
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
TCP(sport=in_port, dport=ext_port,
flags="FA", seq=100, ack=300))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
-
+ self.send_and_expect(self.pg0, p, self.pg1)
pkts = []
# ACK packet out -> in
flags="FA", seq=300, ack=101))
pkts.append(p)
- self.pg1.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg0.get_capture(2)
+ self.send_and_expect(self.pg1, pkts, self.pg0)
# ACK packet in -> out
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
TCP(sport=in_port, dport=ext_port,
flags="A", seq=101, ack=301))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
+ self.send_and_expect(self.pg0, p, self.pg1)
sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
self.assertEqual(len(sessions) - session_n, 1)
- out2in_drops = self.get_err_counter(
- '/err/nat44-ed-out2in/drops due to TCP in transitory timeout')
- in2out_drops = self.get_err_counter(
- '/err/nat44-ed-in2out/drops due to TCP in transitory timeout')
-
- # extra FIN packet out -> in - this should be dropped
+ # retransmit FIN packet out -> in
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
TCP(sport=ext_port, dport=out_port,
flags="FA", seq=300, ack=101))
- self.pg1.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg0.assert_nothing_captured()
+ self.send_and_expect(self.pg1, p, self.pg0)
- # extra ACK packet in -> out - this should be dropped
+ # retransmit ACK packet in -> out
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
TCP(sport=in_port, dport=ext_port,
flags="A", seq=101, ack=301))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.assert_nothing_captured()
-
- stats = self.get_err_counter(
- '/err/nat44-ed-out2in/drops due to TCP in transitory timeout')
- self.assertEqual(stats - out2in_drops, 1)
- stats = self.get_err_counter(
- '/err/nat44-ed-in2out/drops due to TCP in transitory timeout')
- self.assertEqual(stats - in2out_drops, 1)
+ self.send_and_expect(self.pg0, p, self.pg1)
- self.sleep(3)
- # extra ACK packet in -> out - this will cause session to be wiped
+ self.virtual_sleep(3)
+ # retransmit ACK packet in -> out - this will cause session to be wiped
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
TCP(sport=in_port, dport=ext_port,
flags="A", seq=101, ack=301))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.assert_nothing_captured()
+ self.send_and_assert_no_replies(self.pg0, p)
sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
self.assertEqual(len(sessions) - session_n, 0)
sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
self.assertEqual(len(sessions) - session_n, 1)
- out2in_drops = self.get_err_counter(
- '/err/nat44-ed-out2in/drops due to TCP in transitory timeout')
- in2out_drops = self.get_err_counter(
- '/err/nat44-ed-in2out/drops due to TCP in transitory timeout')
-
- # extra FIN packet out -> in - this should be dropped
+ # retransmit FIN packet out -> in
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
TCP(sport=ext_port, dport=out_port,
flags="FA", seq=300, ack=101))
+ self.send_and_expect(self.pg1, p, self.pg0)
- self.pg1.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg0.assert_nothing_captured()
-
- # extra ACK packet in -> out - this should be dropped
+ # retransmit ACK packet in -> out
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
TCP(sport=in_port, dport=ext_port,
flags="A", seq=101, ack=301))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.assert_nothing_captured()
-
- stats = self.get_err_counter(
- '/err/nat44-ed-out2in/drops due to TCP in transitory timeout')
- self.assertEqual(stats - out2in_drops, 1)
- stats = self.get_err_counter(
- '/err/nat44-ed-in2out/drops due to TCP in transitory timeout')
- self.assertEqual(stats - in2out_drops, 1)
+ self.send_and_expect(self.pg0, p, self.pg1)
- self.sleep(3)
- # extra ACK packet in -> out - this will cause session to be wiped
+ self.virtual_sleep(3)
+ # retransmit ACK packet in -> out - this will cause session to be wiped
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
TCP(sport=in_port, dport=ext_port,
flags="A", seq=101, ack=301))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.assert_nothing_captured()
+ self.send_and_assert_no_replies(self.pg0, p)
sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
self.assertEqual(len(sessions) - session_n, 0)
def test_tcp_session_close_simultaneous(self):
- """ NAT44ED Close TCP session from inside network """
+ """ Simultaneous TCP close from both sides """
in_port = self.tcp_port_in
ext_port = 10505
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
TCP(sport=in_port, dport=ext_port,
flags="FA", seq=100, ack=300))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
+ self.send_and_expect(self.pg0, p, self.pg1)
# FIN packet out -> in
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
TCP(sport=ext_port, dport=out_port,
flags="FA", seq=300, ack=100))
- self.pg1.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg0.get_capture(1)
+ self.send_and_expect(self.pg1, p, self.pg0)
# ACK packet in -> out
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
TCP(sport=in_port, dport=ext_port,
flags="A", seq=101, ack=301))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
+ self.send_and_expect(self.pg0, p, self.pg1)
# ACK packet out -> in
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
TCP(sport=ext_port, dport=out_port,
flags="A", seq=301, ack=101))
- self.pg1.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg0.get_capture(1)
+ self.send_and_expect(self.pg1, p, self.pg0)
sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
self.assertEqual(len(sessions) - session_n, 1)
- out2in_drops = self.get_err_counter(
- '/err/nat44-ed-out2in/drops due to TCP in transitory timeout')
- in2out_drops = self.get_err_counter(
- '/err/nat44-ed-in2out/drops due to TCP in transitory timeout')
-
- # extra FIN packet out -> in - this should be dropped
+ # retransmit FIN packet out -> in
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
TCP(sport=ext_port, dport=out_port,
flags="FA", seq=300, ack=101))
+ self.send_and_expect(self.pg1, p, self.pg0)
- self.pg1.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg0.assert_nothing_captured()
+ # retransmit ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port,
+ flags="A", seq=101, ack=301))
+ self.send_and_expect(self.pg0, p, self.pg1)
- # extra ACK packet in -> out - this should be dropped
+ self.virtual_sleep(3)
+ # retransmit ACK packet in -> out - this will cause session to be wiped
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
TCP(sport=in_port, dport=ext_port,
flags="A", seq=101, ack=301))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.assert_nothing_captured()
+ self.pg_send(self.pg0, p)
+ self.send_and_assert_no_replies(self.pg0, p)
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
+ self.assertEqual(len(sessions) - session_n, 0)
- stats = self.get_err_counter(
- '/err/nat44-ed-out2in/drops due to TCP in transitory timeout')
- self.assertEqual(stats - out2in_drops, 1)
- stats = self.get_err_counter(
- '/err/nat44-ed-in2out/drops due to TCP in transitory timeout')
- self.assertEqual(stats - in2out_drops, 1)
+ def test_tcp_session_half_reopen_inside(self):
+ """ TCP session in FIN/FIN state not reopened by in2out SYN only """
+ in_port = self.tcp_port_in
+ ext_port = 10505
- self.sleep(3)
- # extra ACK packet in -> out - this will cause session to be wiped
+ self.nat_add_address(self.nat_addr)
+ self.nat_add_inside_interface(self.pg0)
+ self.nat_add_outside_interface(self.pg1)
+ self.nat_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
+ in_port, ext_port, proto=IP_PROTOS.tcp,
+ flags=self.config_flags.NAT_IS_TWICE_NAT)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
+ session_n = len(sessions)
+
+ self.vapi.nat_set_timeouts(udp=300, tcp_established=7440,
+ tcp_transitory=2, icmp=5)
+
+ out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
+
+ # FIN packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port,
+ flags="FA", seq=100, ack=300))
+ self.send_and_expect(self.pg0, p, self.pg1)
+
+ # FIN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=ext_port, dport=out_port,
+ flags="FA", seq=300, ack=100))
+ self.send_and_expect(self.pg1, p, self.pg0)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
+ self.assertEqual(len(sessions) - session_n, 1)
+
+ # send SYN packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port,
+ flags="S", seq=101, ack=301))
+ self.send_and_expect(self.pg0, p, self.pg1)
+
+ self.virtual_sleep(3)
+ # send ACK packet in -> out - session should be wiped
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
TCP(sport=in_port, dport=ext_port,
flags="A", seq=101, ack=301))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.assert_nothing_captured()
+ self.send_and_assert_no_replies(self.pg0, p, self.pg1)
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
+ self.assertEqual(len(sessions) - session_n, 0)
+
+ def test_tcp_session_half_reopen_outside(self):
+ """ TCP session in FIN/FIN state not reopened by out2in SYN only """
+ in_port = self.tcp_port_in
+ ext_port = 10505
+
+ self.nat_add_address(self.nat_addr)
+ self.nat_add_inside_interface(self.pg0)
+ self.nat_add_outside_interface(self.pg1)
+ self.nat_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
+ in_port, ext_port, proto=IP_PROTOS.tcp,
+ flags=self.config_flags.NAT_IS_TWICE_NAT)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
+ session_n = len(sessions)
+
+ self.vapi.nat_set_timeouts(udp=300, tcp_established=7440,
+ tcp_transitory=2, icmp=5)
+
+ out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
+
+ # FIN packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port,
+ flags="FA", seq=100, ack=300))
+ self.send_and_expect(self.pg0, p, self.pg1)
+
+ # FIN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=ext_port, dport=out_port,
+ flags="FA", seq=300, ack=100))
+ self.send_and_expect(self.pg1, p, self.pg0)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
+ self.assertEqual(len(sessions) - session_n, 1)
+
+ # send SYN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=ext_port, dport=out_port,
+ flags="S", seq=300, ack=101))
+ self.send_and_expect(self.pg1, p, self.pg0)
+
+ self.virtual_sleep(3)
+ # send ACK packet in -> out - session should be wiped
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port,
+ flags="A", seq=101, ack=301))
+ self.send_and_assert_no_replies(self.pg0, p, self.pg1)
sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
self.assertEqual(len(sessions) - session_n, 0)
+ def test_tcp_session_reopen(self):
+ """ TCP session in FIN/FIN state reopened by SYN from both sides """
+ in_port = self.tcp_port_in
+ ext_port = 10505
+
+ self.nat_add_address(self.nat_addr)
+ self.nat_add_inside_interface(self.pg0)
+ self.nat_add_outside_interface(self.pg1)
+ self.nat_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
+ in_port, ext_port, proto=IP_PROTOS.tcp,
+ flags=self.config_flags.NAT_IS_TWICE_NAT)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
+ session_n = len(sessions)
+
+ self.vapi.nat_set_timeouts(udp=300, tcp_established=7440,
+ tcp_transitory=2, icmp=5)
+
+ out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port)
+
+ # FIN packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port,
+ flags="FA", seq=100, ack=300))
+ self.send_and_expect(self.pg0, p, self.pg1)
+
+ # FIN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=ext_port, dport=out_port,
+ flags="FA", seq=300, ack=100))
+ self.send_and_expect(self.pg1, p, self.pg0)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
+ self.assertEqual(len(sessions) - session_n, 1)
+
+ # send SYN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=ext_port, dport=out_port,
+ flags="S", seq=300, ack=101))
+ self.send_and_expect(self.pg1, p, self.pg0)
+
+ # send SYN packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port,
+ flags="S", seq=101, ack=301))
+ self.send_and_expect(self.pg0, p, self.pg1)
+
+ self.virtual_sleep(3)
+ # send ACK packet in -> out - should be forwarded and session alive
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port,
+ flags="A", seq=101, ack=301))
+ self.send_and_expect(self.pg0, p, self.pg1)
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
+ self.assertEqual(len(sessions) - session_n, 1)
+
def test_dynamic_vrf(self):
""" NAT44ED dynamic translation test: different VRF"""
self.logger.info(ppp("p2 packet:", p2))
self.logger.info(ppp("capture packet:", capture))
+ def test_tcp_session_open_retransmit1(self):
+ """ NAT44ED Open TCP session with SYN,ACK retransmit 1
+
+ The client does not receive the [SYN,ACK] or the
+ ACK from the client is lost. Therefore, the [SYN, ACK]
+ is retransmitted by the server.
+ """
+
+ in_port = self.tcp_port_in
+ ext_port = self.tcp_external_port
+ payload = "H" * 10
+
+ self.nat_add_address(self.nat_addr)
+ self.nat_add_inside_interface(self.pg0)
+ self.nat_add_outside_interface(self.pg1)
+
+ self.vapi.nat_set_timeouts(udp=300, tcp_established=7440,
+ tcp_transitory=5, icmp=60)
+ # SYN packet in->out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port, flags="S"))
+ p = self.send_and_expect(self.pg0, p, self.pg1)[0]
+ out_port = p[TCP].sport
+
+ # SYN + ACK packet out->in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=ext_port, dport=out_port, flags="SA"))
+ self.send_and_expect(self.pg1, p, self.pg0)
+
+ # ACK in->out does not arrive
+
+ # resent SYN + ACK packet out->in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=ext_port, dport=out_port, flags="SA"))
+ self.send_and_expect(self.pg1, p, self.pg0)
+
+ # ACK packet in->out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port, flags="A"))
+ self.send_and_expect(self.pg0, p, self.pg1)
+
+ # Verify that the data can be transmitted after the transitory time
+ self.virtual_sleep(6)
+
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port, flags="PA") /
+ Raw(payload))
+ self.send_and_expect(self.pg0, p, self.pg1)
+
+ def test_tcp_session_open_retransmit2(self):
+ """ NAT44ED Open TCP session with SYN,ACK retransmit 2
+
+ The ACK is lost to the server after the TCP session is opened.
+ Data is sent by the client, then the [SYN,ACK] is
+ retransmitted by the server.
+ """
+
+ in_port = self.tcp_port_in
+ ext_port = self.tcp_external_port
+ payload = "H" * 10
+
+ self.nat_add_address(self.nat_addr)
+ self.nat_add_inside_interface(self.pg0)
+ self.nat_add_outside_interface(self.pg1)
+
+ self.vapi.nat_set_timeouts(udp=300, tcp_established=7440,
+ tcp_transitory=5, icmp=60)
+ # SYN packet in->out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port, flags="S"))
+ p = self.send_and_expect(self.pg0, p, self.pg1)[0]
+ out_port = p[TCP].sport
+
+ # SYN + ACK packet out->in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=ext_port, dport=out_port, flags="SA"))
+ self.send_and_expect(self.pg1, p, self.pg0)
+
+ # ACK packet in->out -- not received by the server
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port, flags="A"))
+ self.send_and_expect(self.pg0, p, self.pg1)
+
+ # PUSH + ACK packet in->out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port, flags="PA") /
+ Raw(payload))
+ self.send_and_expect(self.pg0, p, self.pg1)
+
+ # resent SYN + ACK packet out->in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=ext_port, dport=out_port, flags="SA"))
+ self.send_and_expect(self.pg1, p, self.pg0)
+
+ # resent ACK packet in->out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port, flags="A"))
+ self.send_and_expect(self.pg0, p, self.pg1)
+
+ # resent PUSH + ACK packet in->out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port, flags="PA") /
+ Raw(payload))
+ self.send_and_expect(self.pg0, p, self.pg1)
+
+ # ACK packet out->in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=ext_port, dport=out_port, flags="A"))
+ self.send_and_expect(self.pg1, p, self.pg0)
+
+ # Verify that the data can be transmitted after the transitory time
+ self.virtual_sleep(6)
+
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=in_port, dport=ext_port, flags="PA") /
+ Raw(payload))
+ self.send_and_expect(self.pg0, p, self.pg1)
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)