+ def test_one_armed_nat44(self):
+ """ One armed NAT44 """
+ remote_host = self.pg9.remote_hosts[0]
+ local_host = self.pg9.remote_hosts[1]
+ external_port = 0
+
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
+ is_inside=0)
+
+ # in2out
+ p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
+ IP(src=local_host.ip4, dst=remote_host.ip4) /
+ TCP(sport=12345, dport=80))
+ self.pg9.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg9.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(ip.dst, remote_host.ip4)
+ self.assertNotEqual(tcp.sport, 12345)
+ external_port = tcp.sport
+ self.assertEqual(tcp.dport, 80)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # out2in
+ p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
+ IP(src=remote_host.ip4, dst=self.nat_addr) /
+ TCP(sport=80, dport=external_port))
+ self.pg9.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg9.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, remote_host.ip4)
+ self.assertEqual(ip.dst, local_host.ip4)
+ self.assertEqual(tcp.sport, 80)
+ self.assertEqual(tcp.dport, 12345)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ def test_del_session(self):
+ """ Delete NAT44 session """
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ nsessions = len(sessions)
+
+ self.vapi.nat44_del_session(sessions[0].inside_ip_address,
+ sessions[0].inside_port,
+ sessions[0].protocol)
+ self.vapi.nat44_del_session(sessions[1].outside_ip_address,
+ sessions[1].outside_port,
+ sessions[1].protocol,
+ is_in=0)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ self.assertEqual(nsessions - len(sessions), 2)
+