+ def test_forwarding(self):
+ """ NAT44 forwarding test """
+
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ self.vapi.nat44_forwarding_enable_disable(1)
+
+ real_ip = self.pg0.remote_ip4n
+ alias_ip = self.nat_addr_n
+ self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
+ external_ip=alias_ip)
+
+ try:
+ # in2out - static mapping match
+
+ pkts = self.create_stream_out(self.pg1)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, same_port=True)
+
+ # in2out - no static mapping match
+
+ host0 = self.pg0.remote_hosts[0]
+ self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
+ try:
+ pkts = self.create_stream_out(self.pg1,
+ dst_ip=self.pg0.remote_ip4,
+ use_inside_ports=True)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
+ same_port=True)
+ finally:
+ self.pg0.remote_hosts[0] = host0
+
+ finally:
+ self.vapi.nat44_forwarding_enable_disable(0)
+ self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
+ external_ip=alias_ip,
+ is_add=0)
+