nat: fix LRU blocked by inactive session
[vpp.git] / src / plugins / nat / test / test_nat.py
index a8e2af7..2ba7a9b 100644 (file)
@@ -7059,6 +7059,111 @@ class TestNAT44EndpointDependent(MethodHolder):
         self.logger.info(self.vapi.cli("show nat timeouts"))
 
 
+class TestNAT44EndpointDependent2(MethodHolder):
+    """ Endpoint-Dependent mapping and filtering extra test cases """
+
+    translation_buckets = 5
+
+    @classmethod
+    def setUpConstants(cls):
+        super(TestNAT44EndpointDependent2, cls).setUpConstants()
+        cls.vpp_cmdline.extend([
+            "nat", "{", "endpoint-dependent",
+            "translation hash buckets %d" % cls.translation_buckets,
+            "}"
+        ])
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestNAT44EndpointDependent2, cls).setUpClass()
+        cls.vapi.cli("set log class nat level debug")
+
+        cls.nat_addr = '10.0.0.3'
+
+        cls.create_pg_interfaces(range(2))
+
+        for i in cls.pg_interfaces:
+            i.admin_up()
+            i.config_ip4()
+            i.resolve_arp()
+
+    def setUp(self):
+        super(TestNAT44EndpointDependent2, self).setUp()
+        self.vapi.nat_set_timeouts(
+            udp=1, tcp_established=7440, tcp_transitory=30, icmp=1)
+        self.nat44_add_address(self.nat_addr)
+        flags = self.config_flags.NAT_IS_INSIDE
+        self.vapi.nat44_interface_add_del_feature(
+            sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1)
+        self.vapi.nat44_interface_add_del_feature(
+            sw_if_index=self.pg1.sw_if_index, is_add=1)
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestNAT44EndpointDependent2, cls).tearDownClass()
+
+    def init_tcp_session(self, in_if, out_if, sport, ext_dport):
+        # SYN packet in->out
+        p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
+             IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
+             TCP(sport=sport, dport=ext_dport, flags="S"))
+        in_if.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = out_if.get_capture(1)
+        p = capture[0]
+        tcp_port_out = p[TCP].sport
+
+        # SYN + ACK packet out->in
+        p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
+             IP(src=out_if.remote_ip4, dst=self.nat_addr) /
+             TCP(sport=ext_dport, dport=tcp_port_out, flags="SA"))
+        out_if.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        in_if.get_capture(1)
+
+        # ACK packet in->out
+        p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
+             IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
+             TCP(sport=sport, dport=ext_dport, flags="A"))
+        in_if.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        out_if.get_capture(1)
+
+        return tcp_port_out
+
+    def test_lru_cleanup(self):
+        """ LRU cleanup algorithm """
+        tcp_port_out = self.init_tcp_session(self.pg0, self.pg1, 2000, 80)
+        max_translations = 10 * self.translation_buckets
+        pkts = []
+        for i in range(0, max_translations - 1):
+            p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
+                 UDP(sport=7000+i, dport=80))
+            pkts.append(p)
+
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        self.pg1.get_capture(len(pkts))
+        self.sleep(1.5, "wait for timeouts")
+
+        pkts = []
+        for i in range(0, max_translations - 1):
+            p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
+                 ICMP(id=8000+i, type='echo-request'))
+            pkts.append(p)
+
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        self.pg1.get_capture(len(pkts))
+
+
 class TestNAT44Out2InDPO(MethodHolder):
     """ NAT44 Test Cases using out2in DPO """