NAT: test cleanup (VPP-1252)
[vpp.git] / test / test_nat.py
index 73e414a..47e779b 100644 (file)
@@ -138,6 +138,7 @@ class MethodHolder(VppTestCase):
         self.vapi.nat_set_reass(is_ip6=1)
         self.verify_no_nat44_user()
         self.vapi.nat_set_timeouts()
+        self.vapi.nat_set_addr_and_port_alloc_alg()
 
     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
                                  local_port=0, external_port=0, vrf_id=0,
@@ -421,14 +422,13 @@ class MethodHolder(VppTestCase):
         return pkts
 
     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
-                           packet_num=3, dst_ip=None, is_ip6=False):
+                           dst_ip=None, is_ip6=False):
         """
         Verify captured packets on outside network
 
         :param capture: Captured packets
         :param nat_ip: Translated IP address (Default use global NAT address)
         :param same_port: Sorce port number is not translated (Default False)
-        :param packet_num: Expected number of packets (Default 3)
         :param dst_ip: Destination IP address (Default do not verify)
         :param is_ip6: If L3 protocol is IPv6 (Default False)
         """
@@ -440,7 +440,6 @@ class MethodHolder(VppTestCase):
             ICMP46 = ICMP
         if nat_ip is None:
             nat_ip = self.nat_addr
-        self.assertEqual(packet_num, len(capture))
         for packet in capture:
             try:
                 if not is_ip6:
@@ -476,28 +475,25 @@ class MethodHolder(VppTestCase):
                 raise
 
     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
-                               packet_num=3, dst_ip=None):
+                               dst_ip=None):
         """
         Verify captured packets on outside network
 
         :param capture: Captured packets
         :param nat_ip: Translated IP address
         :param same_port: Sorce port number is not translated (Default False)
-        :param packet_num: Expected number of packets (Default 3)
         :param dst_ip: Destination IP address (Default do not verify)
         """
-        return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
-                                       dst_ip, True)
+        return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
+                                       True)
 
-    def verify_capture_in(self, capture, in_if, packet_num=3):
+    def verify_capture_in(self, capture, in_if):
         """
         Verify captured packets on inside network
 
         :param capture: Captured packets
         :param in_if: Inside interface
-        :param packet_num: Expected number of packets (Default 3)
         """
-        self.assertEqual(packet_num, len(capture))
         for packet in capture:
             try:
                 self.assert_packet_checksums_valid(packet)
@@ -513,16 +509,14 @@ class MethodHolder(VppTestCase):
                                       "(inside network):", packet))
                 raise
 
-    def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
+    def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
         """
         Verify captured IPv6 packets on inside network
 
         :param capture: Captured packets
         :param src_ip: Source IP
         :param dst_ip: Destination IP address
-        :param packet_num: Expected number of packets (Default 3)
         """
-        self.assertEqual(packet_num, len(capture))
         for packet in capture:
             try:
                 self.assertEqual(packet[IPv6].src, src_ip)
@@ -564,20 +558,18 @@ class MethodHolder(VppTestCase):
                 raise
 
     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
-                                            packet_num=3, icmp_type=11):
+                                            icmp_type=11):
         """
         Verify captured packets with ICMP errors on outside network
 
         :param capture: Captured packets
         :param src_ip: Translated IP address or IP address of VPP
                        (Default use global NAT address)
-        :param packet_num: Expected number of packets (Default 3)
         :param icmp_type: Type of error ICMP packet
                           we are expecting (Default 11)
         """
         if src_ip is None:
             src_ip = self.nat_addr
-        self.assertEqual(packet_num, len(capture))
         for packet in capture:
             try:
                 self.assertEqual(packet[IP].src, src_ip)
@@ -599,18 +591,15 @@ class MethodHolder(VppTestCase):
                                       "(outside network):", packet))
                 raise
 
-    def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
-                                           icmp_type=11):
+    def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
         """
         Verify captured packets with ICMP errors on inside network
 
         :param capture: Captured packets
         :param in_if: Inside interface
-        :param packet_num: Expected number of packets (Default 3)
         :param icmp_type: Type of error ICMP packet
                           we are expecting (Default 11)
         """
-        self.assertEqual(packet_num, len(capture))
         for packet in capture:
             try:
                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
@@ -1236,7 +1225,6 @@ class TestNAT44(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.get_capture(len(pkts))
-        self.assertEqual(1, len(capture))
         packet = capture[0]
         try:
             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
@@ -1264,7 +1252,7 @@ class TestNAT44(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg0.get_capture(1)
-        self.verify_capture_in(capture, self.pg0, packet_num=1)
+        self.verify_capture_in(capture, self.pg0)
         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
 
         # in2out
@@ -1275,7 +1263,7 @@ class TestNAT44(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.get_capture(1)
-        self.verify_capture_out(capture, same_port=True, packet_num=1)
+        self.verify_capture_out(capture, same_port=True)
         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
 
     def test_forwarding(self):
@@ -3098,8 +3086,10 @@ class TestNAT44(MethodHolder):
         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
                                                   is_inside=0)
-        self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
-                      "psid-offset 6 psid-len 6")
+        self.vapi.nat_set_addr_and_port_alloc_alg(alg=1,
+                                                  psid_offset=6,
+                                                  psid_length=6,
+                                                  psid=10)
 
         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
@@ -3122,6 +3112,31 @@ class TestNAT44(MethodHolder):
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
 
+    def test_port_range(self):
+        """ External address port range """
+        self.nat44_add_address(self.nat_addr)
+        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+                                                  is_inside=0)
+        self.vapi.nat_set_addr_and_port_alloc_alg(alg=2,
+                                                  start_port=1025,
+                                                  end_port=1027)
+
+        pkts = []
+        for port in range(0, 5):
+            p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+                 TCP(sport=1125 + port))
+            pkts.append(p)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(3)
+        for p in capture:
+            tcp = p[TCP]
+            self.assertGreaterEqual(tcp.sport, 1025)
+            self.assertLessEqual(tcp.sport, 1027)
+
     def test_ipfix_max_frags(self):
         """ IPFIX logging maximum fragments pending reassembly exceeded """
         self.nat44_add_address(self.nat_addr)
@@ -3290,7 +3305,8 @@ class TestNAT44(MethodHolder):
             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
             self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
             self.logger.info(self.vapi.cli("show nat timeouts"))
-            self.vapi.cli("nat addr-port-assignment-alg default")
+            self.logger.info(
+                self.vapi.cli("show nat addr-port-assignment-alg"))
             self.clear_nat44()
             self.vapi.cli("clear logging")
 
@@ -5363,18 +5379,16 @@ class TestDeterministicNAT(MethodHolder):
 
         return pkts
 
-    def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
+    def verify_capture_out(self, capture, nat_ip=None):
         """
         Verify captured packets on outside network
 
         :param capture: Captured packets
         :param nat_ip: Translated IP address (Default use global NAT address)
         :param same_port: Sorce port number is not translated (Default False)
-        :param packet_num: Expected number of packets (Default 3)
         """
         if nat_ip is None:
             nat_ip = self.nat_addr
-        self.assertEqual(packet_num, len(capture))
         for packet in capture:
             try:
                 self.assertEqual(packet[IP].src, nat_ip)
@@ -5938,7 +5952,6 @@ class TestNAT64(MethodHolder):
 
         # Wait for Neighbor Solicitation
         capture = self.pg5.get_capture(len(pkts))
-        self.assertEqual(1, len(capture))
         packet = capture[0]
         try:
             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
@@ -5966,7 +5979,6 @@ class TestNAT64(MethodHolder):
 
         # Wait for ping reply
         capture = self.pg5.get_capture(len(pkts))
-        self.assertEqual(1, len(capture))
         packet = capture[0]
         try:
             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
@@ -7292,7 +7304,6 @@ class TestDSlite(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.get_capture(1)
-        self.assertEqual(1, len(capture))
         capture = capture[0]
         self.assertEqual(capture[IPv6].src, aftr_ip6)
         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
@@ -7399,7 +7410,6 @@ class TestDSliteCE(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.get_capture(1)
-        self.assertEqual(1, len(capture))
         capture = capture[0]
         self.assertEqual(capture[IPv6].src, b4_ip6)
         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)