from vpp_papi import VppEnum
-class NAT44EDTestCase(VppTestCase):
+class TestNAT44ED(VppTestCase):
+ """ NAT44ED Test Case """
- nat_addr = '10.0.0.3'
+ nat_addr = '10.0.10.3'
tcp_port_in = 6303
tcp_port_out = 6303
max_sessions = 100
def setUp(self):
- super(NAT44EDTestCase, self).setUp()
+ super().setUp()
self.plugin_enable()
def tearDown(self):
- super(NAT44EDTestCase, self).tearDown()
+ super().tearDown()
if not self.vpp_dead:
self.plugin_disable()
@classmethod
def setUpClass(cls):
- super(NAT44EDTestCase, cls).setUpClass()
+ super().setUpClass()
cls.create_pg_interfaces(range(12))
cls.interfaces = list(cls.pg_interfaces[:4])
self.assertEqual(sd_params.get('XDPORT'),
"%d" % self.tcp_external_port)
+ def test_icmp_error(self):
+ """ NAT44ED test ICMP error message with inner header"""
-class TestNAT44ED(NAT44EDTestCase):
- """ NAT44ED Test Case """
+ payload = "H" * 10
+
+ self.nat_add_address(self.nat_addr)
+ self.nat_add_inside_interface(self.pg0)
+ self.nat_add_outside_interface(self.pg1)
+
+ # in2out (initiate connection)
+ p1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ UDP(sport=21, dport=20) / payload)
+
+ self.pg0.add_stream(p1)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)[0]
+
+ # out2in (send error message)
+ p2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ ICMP(type='dest-unreach', code='port-unreachable') /
+ capture[IP:])
+
+ self.pg1.add_stream(p2)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ capture = self.pg0.get_capture(1)[0]
+
+ self.logger.info(ppp("p1 packet:", p1))
+ self.logger.info(ppp("p2 packet:", p2))
+ self.logger.info(ppp("capture packet:", capture))
+
+ def test_icmp_echo_reply_trailer(self):
+ """ ICMP echo reply with ethernet trailer"""
+
+ self.nat_add_address(self.nat_addr)
+ self.nat_add_inside_interface(self.pg0)
+ self.nat_add_outside_interface(self.pg1)
+
+ # in2out
+ p1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ ICMP(type=8, id=0xabcd, seq=0))
+
+ self.pg0.add_stream(p1)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ c = self.pg1.get_capture(1)[0]
+
+ self.logger.debug(self.vapi.cli("show trace"))
+
+ # out2in
+ p2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr, id=0xee59) /
+ ICMP(type=0, id=c[ICMP].id, seq=0))
+
+ # force checksum calculation
+ p2 = p2.__class__(bytes(p2))
+
+ self.logger.debug(ppp("Packet before modification:", p2))
+
+ # hex representation of vss monitoring ethernet trailer
+ # this seems to be just added to end of packet without modifying
+ # IP or ICMP lengths / checksums
+ p2 = p2 / Raw("\x00\x00\x52\x54\x00\x46\xab\x04\x84\x18")
+ # change it so that IP/ICMP is unaffected
+ p2[IP].len = 28
+
+ self.logger.debug(ppp("Packet with added trailer:", p2))
+
+ self.pg1.add_stream(p2)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ self.pg0.get_capture(1)
def test_users_dump(self):
""" NAT44ED API test - nat44_user_dump """
self.pg_start()
capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture, ignore_port=True)
+ self.logger.debug(self.vapi.cli("show trace"))
# out2in
pkts = self.create_stream_out(self.pg1)
self.pg_start()
capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
+ self.logger.debug(self.vapi.cli("show trace"))
# in2out
pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
self.pg_start()
capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture, ignore_port=True)
+ self.logger.debug(self.vapi.cli("show trace"))
# out2in
pkts = self.create_stream_out(self.pg1, ttl=2)
self.pg_start()
capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
+ self.logger.debug(self.vapi.cli("show trace"))
# in2out
pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
vpp_worker_count = 4
max_sessions = 5000
- @unittest.skip('MW fix required')
- def test_users_dump(self):
- """ NAT44ED API test - nat44_user_dump """
-
- @unittest.skip('MW fix required')
- def test_frag_out_of_order_do_not_translate(self):
- """ NAT44ED don't translate fragments arriving out of order """
-
- @unittest.skip('MW fix required')
- def test_forwarding(self):
- """ NAT44ED forwarding test """
-
- @unittest.skip('MW fix required')
- def test_twice_nat(self):
- """ NAT44ED Twice NAT """
-
- @unittest.skip('MW fix required')
- def test_twice_nat_lb(self):
- """ NAT44ED Twice NAT local service load balancing """
-
- @unittest.skip('MW fix required')
- def test_output_feature(self):
- """ NAT44ED interface output feature (in2out postrouting) """
-
- @unittest.skip('MW fix required')
- def test_static_with_port_out2(self):
- """ NAT44ED 1:1 NAPT asymmetrical rule """
-
- @unittest.skip('MW fix required')
- def test_output_feature_and_service2(self):
- """ NAT44ED interface output feature and service host direct access """
-
- @unittest.skip('MW fix required')
- def test_static_lb(self):
- """ NAT44ED local service load balancing """
-
- @unittest.skip('MW fix required')
- def test_static_lb_2(self):
- """ NAT44ED local service load balancing (asymmetrical rule) """
-
- @unittest.skip('MW fix required')
- def test_lb_affinity(self):
- """ NAT44ED local service load balancing affinity """
-
- @unittest.skip('MW fix required')
- def test_multiple_vrf(self):
- """ NAT44ED Multiple VRF setup """
-
- @unittest.skip('MW fix required')
- def test_self_twice_nat_positive(self):
- """ NAT44ED Self Twice NAT (positive test) """
-
- @unittest.skip('MW fix required')
- def test_self_twice_nat_lb_positive(self):
- """ NAT44ED Self Twice NAT local service load balancing (positive test)
- """
-
def test_dynamic(self):
""" NAT44ED dynamic translation test """
pkt_count = 1500
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg1.get_capture(pkt_count * 3)
+ capture = self.pg1.get_capture(pkt_count * 3, timeout=5)
if_idx = self.pg0.sw_if_index
tc2 = self.statistics['/nat44-ed/in2out/slowpath/tcp']
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.pg1.get_capture(len(pkts))
- self.sleep(1.5, "wait for timeouts")
+ self.virtual_sleep(1.5, "wait for timeouts")
pkts = []
for i in range(0, self.max_sessions - 1):
self.pg_start()
self.pg1.get_capture(1)
- self.sleep(6)
+ self.virtual_sleep(6)
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
self.nat_add_inside_interface(self.pg0)
self.nat_add_outside_interface(self.pg0)
- self.vapi.nat44_interface_add_del_output_feature(
+ self.vapi.nat44_ed_add_del_output_interface(
sw_if_index=self.pg1.sw_if_index, is_add=1)
# from client to service
self.nat_add_inside_interface(self.pg0)
self.nat_add_outside_interface(self.pg0)
- self.vapi.nat44_interface_add_del_output_feature(
+ self.vapi.nat44_ed_add_del_output_interface(
sw_if_index=self.pg1.sw_if_index, is_add=1)
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
""" NAT44ED output feature works with stateful ACL """
self.nat_add_address(self.nat_addr)
- self.vapi.nat44_interface_add_del_output_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=self.config_flags.NAT_IS_INSIDE, is_add=1)
- self.vapi.nat44_interface_add_del_output_feature(
- sw_if_index=self.pg1.sw_if_index,
- flags=self.config_flags.NAT_IS_OUTSIDE, is_add=1)
+ self.vapi.nat44_ed_add_del_output_interface(
+ sw_if_index=self.pg1.sw_if_index, is_add=1)
# First ensure that the NAT is working sans ACL
self.vapi.nat44_interface_add_del_feature(
sw_if_index=self.pg0.sw_if_index,
flags=flags, is_add=1)
- self.vapi.nat44_interface_add_del_output_feature(
+ self.vapi.nat44_ed_add_del_output_interface(
is_add=1,
sw_if_index=self.pg1.sw_if_index)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- self.sleep(new_transitory, "wait for transitory timeout")
+ self.virtual_sleep(new_transitory, "wait for transitory timeout")
self.pg0.assert_nothing_captured(0)
# session should still exist
'/err/nat44-ed-in2out/drops due to TCP in transitory timeout')
self.assertEqual(stats - in2out_drops, 1)
- self.sleep(3)
+ self.virtual_sleep(3)
# extra ACK packet in -> out - this will cause session to be wiped
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
'/err/nat44-ed-in2out/drops due to TCP in transitory timeout')
self.assertEqual(stats - in2out_drops, 1)
- self.sleep(3)
+ self.virtual_sleep(3)
# extra ACK packet in -> out - this will cause session to be wiped
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
'/err/nat44-ed-in2out/drops due to TCP in transitory timeout')
self.assertEqual(stats - in2out_drops, 1)
- self.sleep(3)
+ self.virtual_sleep(3)
# extra ACK packet in -> out - this will cause session to be wiped
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
new_vrf_id = 22
self.nat_add_address(self.nat_addr)
- flags = self.config_flags.NAT_IS_INSIDE
- self.vapi.nat44_interface_add_del_output_feature(
- sw_if_index=self.pg7.sw_if_index,
- flags=flags, is_add=1)
self.vapi.nat44_interface_add_del_output_feature(
- sw_if_index=self.pg8.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg8.sw_if_index, is_add=1)
try:
self.configure_ip4_interface(self.pg7, table_id=new_vrf_id)
capture = self.pg8.get_capture(len(pkts))
self.verify_capture_out(capture, ignore_port=True)
- if_idx = self.pg7.sw_if_index
+ if_idx = self.pg8.sw_if_index
cnt = self.statistics['/nat44-ed/in2out/slowpath/tcp']
self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
cnt = self.statistics['/nat44-ed/in2out/slowpath/udp']
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
+ def test_icmp_error_fwd_outbound(self):
+ """ NAT44ED ICMP error outbound with forwarding enabled """
+
+ # Ensure that an outbound ICMP error message is properly associated
+ # with the inbound forward bypass session it is related to.
+ payload = "H" * 10
+
+ self.nat_add_address(self.nat_addr)
+ self.nat_add_inside_interface(self.pg0)
+ self.nat_add_outside_interface(self.pg1)
+
+ # enable forwarding and initiate connection out2in
+ self.vapi.nat44_forwarding_enable_disable(enable=1)
+ p1 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
+ UDP(sport=21, dport=20) / payload)
+
+ self.pg1.add_stream(p1)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)[0]
+
+ self.logger.info(self.vapi.cli("show nat44 sessions"))
+
+ # reply with ICMP error message in2out
+ # We cannot reliably retrieve forward bypass sessions via the API.
+ # session dumps for a user will only look on the worker that the
+ # user is supposed to be mapped to in2out. The forward bypass session
+ # is not necessarily created on that worker.
+ p2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ ICMP(type='dest-unreach', code='port-unreachable') /
+ capture[IP:])
+
+ self.pg0.add_stream(p2)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)[0]
+
+ self.logger.info(self.vapi.cli("show nat44 sessions"))
+
+ self.logger.info(ppp("p1 packet:", p1))
+ self.logger.info(ppp("p2 packet:", p2))
+ self.logger.info(ppp("capture packet:", capture))
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)