X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_nat.py;h=47e779b90260504db5a6a65950a81702fdc56125;hb=05ca4a364366ffd639b6136967330deb249cbe22;hp=73e414ad1aed13c15159a368b59183190a1f5a6b;hpb=ea5b5be4eeb0f4cd80cb466bd6e31cad33c57960;p=vpp.git diff --git a/test/test_nat.py b/test/test_nat.py index 73e414ad1ae..47e779b9026 100644 --- a/test/test_nat.py +++ b/test/test_nat.py @@ -138,6 +138,7 @@ class MethodHolder(VppTestCase): self.vapi.nat_set_reass(is_ip6=1) self.verify_no_nat44_user() self.vapi.nat_set_timeouts() + self.vapi.nat_set_addr_and_port_alloc_alg() def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0', local_port=0, external_port=0, vrf_id=0, @@ -421,14 +422,13 @@ class MethodHolder(VppTestCase): return pkts def verify_capture_out(self, capture, nat_ip=None, same_port=False, - packet_num=3, dst_ip=None, is_ip6=False): + dst_ip=None, is_ip6=False): """ Verify captured packets on outside network :param capture: Captured packets :param nat_ip: Translated IP address (Default use global NAT address) :param same_port: Sorce port number is not translated (Default False) - :param packet_num: Expected number of packets (Default 3) :param dst_ip: Destination IP address (Default do not verify) :param is_ip6: If L3 protocol is IPv6 (Default False) """ @@ -440,7 +440,6 @@ class MethodHolder(VppTestCase): ICMP46 = ICMP if nat_ip is None: nat_ip = self.nat_addr - self.assertEqual(packet_num, len(capture)) for packet in capture: try: if not is_ip6: @@ -476,28 +475,25 @@ class MethodHolder(VppTestCase): raise def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, - packet_num=3, dst_ip=None): + dst_ip=None): """ Verify captured packets on outside network :param capture: Captured packets :param nat_ip: Translated IP address :param same_port: Sorce port number is not translated (Default False) - :param packet_num: Expected number of packets (Default 3) :param dst_ip: Destination IP address (Default do not verify) """ - return self.verify_capture_out(capture, nat_ip, same_port, packet_num, - dst_ip, True) + return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, + True) - def verify_capture_in(self, capture, in_if, packet_num=3): + def verify_capture_in(self, capture, in_if): """ Verify captured packets on inside network :param capture: Captured packets :param in_if: Inside interface - :param packet_num: Expected number of packets (Default 3) """ - self.assertEqual(packet_num, len(capture)) for packet in capture: try: self.assert_packet_checksums_valid(packet) @@ -513,16 +509,14 @@ class MethodHolder(VppTestCase): "(inside network):", packet)) raise - def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3): + def verify_capture_in_ip6(self, capture, src_ip, dst_ip): """ Verify captured IPv6 packets on inside network :param capture: Captured packets :param src_ip: Source IP :param dst_ip: Destination IP address - :param packet_num: Expected number of packets (Default 3) """ - self.assertEqual(packet_num, len(capture)) for packet in capture: try: self.assertEqual(packet[IPv6].src, src_ip) @@ -564,20 +558,18 @@ class MethodHolder(VppTestCase): raise def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, - packet_num=3, icmp_type=11): + icmp_type=11): """ Verify captured packets with ICMP errors on outside network :param capture: Captured packets :param src_ip: Translated IP address or IP address of VPP (Default use global NAT address) - :param packet_num: Expected number of packets (Default 3) :param icmp_type: Type of error ICMP packet we are expecting (Default 11) """ if src_ip is None: src_ip = self.nat_addr - self.assertEqual(packet_num, len(capture)) for packet in capture: try: self.assertEqual(packet[IP].src, src_ip) @@ -599,18 +591,15 @@ class MethodHolder(VppTestCase): "(outside network):", packet)) raise - def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3, - icmp_type=11): + def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11): """ Verify captured packets with ICMP errors on inside network :param capture: Captured packets :param in_if: Inside interface - :param packet_num: Expected number of packets (Default 3) :param icmp_type: Type of error ICMP packet we are expecting (Default 11) """ - self.assertEqual(packet_num, len(capture)) for packet in capture: try: self.assertEqual(packet[IP].dst, in_if.remote_ip4) @@ -1236,7 +1225,6 @@ class TestNAT44(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.assertEqual(1, len(capture)) packet = capture[0] try: self.assertEqual(packet[IP].src, self.pg1.local_ip4) @@ -1264,7 +1252,7 @@ class TestNAT44(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(1) - self.verify_capture_in(capture, self.pg0, packet_num=1) + self.verify_capture_in(capture, self.pg0) self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp) # in2out @@ -1275,7 +1263,7 @@ class TestNAT44(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(1) - self.verify_capture_out(capture, same_port=True, packet_num=1) + self.verify_capture_out(capture, same_port=True) self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp) def test_forwarding(self): @@ -3098,8 +3086,10 @@ class TestNAT44(MethodHolder): self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index, is_inside=0) - self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 " - "psid-offset 6 psid-len 6") + self.vapi.nat_set_addr_and_port_alloc_alg(alg=1, + psid_offset=6, + psid_length=6, + psid=10) p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / @@ -3122,6 +3112,31 @@ class TestNAT44(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise + def test_port_range(self): + """ External address port range """ + self.nat44_add_address(self.nat_addr) + self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + self.vapi.nat_set_addr_and_port_alloc_alg(alg=2, + start_port=1025, + end_port=1027) + + pkts = [] + for port in range(0, 5): + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + TCP(sport=1125 + port)) + pkts.append(p) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(3) + for p in capture: + tcp = p[TCP] + self.assertGreaterEqual(tcp.sport, 1025) + self.assertLessEqual(tcp.sport, 1027) + def test_ipfix_max_frags(self): """ IPFIX logging maximum fragments pending reassembly exceeded """ self.nat44_add_address(self.nat_addr) @@ -3290,7 +3305,8 @@ class TestNAT44(MethodHolder): self.logger.info(self.vapi.cli("show nat virtual-reassembly")) self.logger.info(self.vapi.cli("show nat44 hash tables detail")) self.logger.info(self.vapi.cli("show nat timeouts")) - self.vapi.cli("nat addr-port-assignment-alg default") + self.logger.info( + self.vapi.cli("show nat addr-port-assignment-alg")) self.clear_nat44() self.vapi.cli("clear logging") @@ -5363,18 +5379,16 @@ class TestDeterministicNAT(MethodHolder): return pkts - def verify_capture_out(self, capture, nat_ip=None, packet_num=3): + def verify_capture_out(self, capture, nat_ip=None): """ Verify captured packets on outside network :param capture: Captured packets :param nat_ip: Translated IP address (Default use global NAT address) :param same_port: Sorce port number is not translated (Default False) - :param packet_num: Expected number of packets (Default 3) """ if nat_ip is None: nat_ip = self.nat_addr - self.assertEqual(packet_num, len(capture)) for packet in capture: try: self.assertEqual(packet[IP].src, nat_ip) @@ -5938,7 +5952,6 @@ class TestNAT64(MethodHolder): # Wait for Neighbor Solicitation capture = self.pg5.get_capture(len(pkts)) - self.assertEqual(1, len(capture)) packet = capture[0] try: self.assertEqual(packet[IPv6].src, self.pg5.local_ip6) @@ -5966,7 +5979,6 @@ class TestNAT64(MethodHolder): # Wait for ping reply capture = self.pg5.get_capture(len(pkts)) - self.assertEqual(1, len(capture)) packet = capture[0] try: self.assertEqual(packet[IPv6].src, self.pg5.local_ip6) @@ -7292,7 +7304,6 @@ class TestDSlite(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(1) - self.assertEqual(1, len(capture)) capture = capture[0] self.assertEqual(capture[IPv6].src, aftr_ip6) self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6) @@ -7399,7 +7410,6 @@ class TestDSliteCE(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(1) - self.assertEqual(1, len(capture)) capture = capture[0] self.assertEqual(capture[IPv6].src, b4_ip6) self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)