X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fplugins%2Fnat%2Ftest%2Ftest_nat.py;h=0daa61042c17b17bbdad74d9fc73fe078b0d1cf8;hb=e6eaa24f156e908dcbb92209c4f50f8da8066d79;hp=29fd5ca23c582263f361c01eea43076fd62df9ed;hpb=b644eb54f268b7cd321bdd35632b31112c6783c6;p=vpp.git diff --git a/src/plugins/nat/test/test_nat.py b/src/plugins/nat/test/test_nat.py index 29fd5ca23c5..0daa61042c1 100644 --- a/src/plugins/nat/test/test_nat.py +++ b/src/plugins/nat/test/test_nat.py @@ -2606,61 +2606,6 @@ class TestNAT44(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise - def test_max_translations_per_user(self): - """ MAX translations per user - recycle the least recently used """ - - self.nat44_add_address(self.nat_addr) - flags = self.config_flags.NAT_IS_INSIDE - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) - self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - # get maximum number of translations per user - nat44_config = self.vapi.nat_show_config() - - # send more than maximum number of translations per user packets - pkts_num = nat44_config.max_translations_per_user + 5 - pkts = [] - for port in range(0, pkts_num): - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=1025 + port)) - pkts.append(p) - self.pg0.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - - # verify number of translated packet - self.pg1.get_capture(pkts_num) - - users = self.vapi.nat44_user_dump() - for user in users: - if user.ip_address == self.pg0.remote_ip4: - self.assertEqual(user.nsessions, - nat44_config.max_translations_per_user) - self.assertEqual(user.nstaticsessions, 0) - - tcp_port = 22 - self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - tcp_port, tcp_port, - proto=IP_PROTOS.tcp) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=tcp_port)) - self.pg0.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - self.pg1.get_capture(1) - users = self.vapi.nat44_user_dump() - for user in users: - if user.ip_address == self.pg0.remote_ip4: - self.assertEqual(user.nsessions, - nat44_config.max_translations_per_user - 1) - self.assertEqual(user.nstaticsessions, 1) - def test_interface_addr(self): """ Acquire NAT44 addresses from interface """ self.vapi.nat44_add_del_interface_addr( @@ -4386,6 +4331,101 @@ class TestNAT44(MethodHolder): self.logger.info(self.vapi.cli("show nat ha")) +class TestNAT44EndpointDependent2(MethodHolder): + """ Endpoint-Dependent session test cases """ + + icmp_timeout = 2 + + @classmethod + def setUpConstants(cls): + super(TestNAT44EndpointDependent2, cls).setUpConstants() + cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", + "translation", "hash", "buckets", "1", + "icmp", "timeout", str(cls.icmp_timeout), "}"]) + + @classmethod + def setUpClass(cls): + super(TestNAT44EndpointDependent2, cls).setUpClass() + try: + translation_buckets = 1 + cls.max_translations = 10 * translation_buckets + + cls.create_pg_interfaces(range(2)) + cls.interfaces = list(cls.pg_interfaces[0:2]) + + for i in cls.interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + + cls.pg0.generate_remote_hosts(1) + cls.pg0.configure_ipv4_neighbors() + + cls.pg1.generate_remote_hosts(1) + cls.pg1.configure_ipv4_neighbors() + + except Exception: + super(TestNAT44EndpointDependent2, cls).tearDownClass() + raise + + def create_icmp_stream(self, in_if, out_if, count): + """ + Create ICMP packet stream for inside network + + :param in_if: Inside interface + :param out_if: Outside interface + :param count: Number of packets + """ + + self.assertTrue(count > 0) + icmp_id = random.randint(0, 65535 - (count - 1)) + + pkts = list() + for i in range(count): + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64) / + ICMP(id=icmp_id + i, type='echo-request')) + pkts.append(p) + return pkts + + def send_pkts(self, pkts, expected=None): + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + return self.pg1.get_capture( + len(pkts) if expected is None else expected) + + def test_session_cleanup(self): + """ NAT44 session cleanup test """ + + self.nat44_add_address(self.pg1.local_ip4) + flags = self.config_flags.NAT_IS_INSIDE + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg0.sw_if_index, + flags=flags, is_add=1) + self.vapi.nat44_interface_add_del_feature( + sw_if_index=self.pg1.sw_if_index, + is_add=1) + + nat_config = self.vapi.nat_show_config() + self.assertEqual(1, nat_config.endpoint_dependent) + + pkts = self.create_icmp_stream(self.pg0, self.pg1, + self.max_translations + 2) + sz = len(pkts) + + # positive test + self.send_pkts(pkts[0:self.max_translations]) + + # false positive test + self.send_pkts(pkts[self.max_translations:sz - 1], 0) + + sleep(self.icmp_timeout) + + # positive test + self.send_pkts(pkts[self.max_translations + 1:sz]) + + class TestNAT44EndpointDependent(MethodHolder): """ Endpoint-Dependent mapping and filtering test cases """