NAT: test cleanup (VPP-1252)
[vpp.git] / test / test_nat.py
index 79d2622..47e779b 100644 (file)
@@ -138,6 +138,7 @@ class MethodHolder(VppTestCase):
         self.vapi.nat_set_reass(is_ip6=1)
         self.verify_no_nat44_user()
         self.vapi.nat_set_timeouts()
+        self.vapi.nat_set_addr_and_port_alloc_alg()
 
     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
                                  local_port=0, external_port=0, vrf_id=0,
@@ -421,14 +422,13 @@ class MethodHolder(VppTestCase):
         return pkts
 
     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
-                           packet_num=3, dst_ip=None, is_ip6=False):
+                           dst_ip=None, is_ip6=False):
         """
         Verify captured packets on outside network
 
         :param capture: Captured packets
         :param nat_ip: Translated IP address (Default use global NAT address)
         :param same_port: Sorce port number is not translated (Default False)
-        :param packet_num: Expected number of packets (Default 3)
         :param dst_ip: Destination IP address (Default do not verify)
         :param is_ip6: If L3 protocol is IPv6 (Default False)
         """
@@ -440,7 +440,6 @@ class MethodHolder(VppTestCase):
             ICMP46 = ICMP
         if nat_ip is None:
             nat_ip = self.nat_addr
-        self.assertEqual(packet_num, len(capture))
         for packet in capture:
             try:
                 if not is_ip6:
@@ -476,28 +475,25 @@ class MethodHolder(VppTestCase):
                 raise
 
     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
-                               packet_num=3, dst_ip=None):
+                               dst_ip=None):
         """
         Verify captured packets on outside network
 
         :param capture: Captured packets
         :param nat_ip: Translated IP address
         :param same_port: Sorce port number is not translated (Default False)
-        :param packet_num: Expected number of packets (Default 3)
         :param dst_ip: Destination IP address (Default do not verify)
         """
-        return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
-                                       dst_ip, True)
+        return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
+                                       True)
 
-    def verify_capture_in(self, capture, in_if, packet_num=3):
+    def verify_capture_in(self, capture, in_if):
         """
         Verify captured packets on inside network
 
         :param capture: Captured packets
         :param in_if: Inside interface
-        :param packet_num: Expected number of packets (Default 3)
         """
-        self.assertEqual(packet_num, len(capture))
         for packet in capture:
             try:
                 self.assert_packet_checksums_valid(packet)
@@ -513,16 +509,14 @@ class MethodHolder(VppTestCase):
                                       "(inside network):", packet))
                 raise
 
-    def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
+    def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
         """
         Verify captured IPv6 packets on inside network
 
         :param capture: Captured packets
         :param src_ip: Source IP
         :param dst_ip: Destination IP address
-        :param packet_num: Expected number of packets (Default 3)
         """
-        self.assertEqual(packet_num, len(capture))
         for packet in capture:
             try:
                 self.assertEqual(packet[IPv6].src, src_ip)
@@ -564,20 +558,18 @@ class MethodHolder(VppTestCase):
                 raise
 
     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
-                                            packet_num=3, icmp_type=11):
+                                            icmp_type=11):
         """
         Verify captured packets with ICMP errors on outside network
 
         :param capture: Captured packets
         :param src_ip: Translated IP address or IP address of VPP
                        (Default use global NAT address)
-        :param packet_num: Expected number of packets (Default 3)
         :param icmp_type: Type of error ICMP packet
                           we are expecting (Default 11)
         """
         if src_ip is None:
             src_ip = self.nat_addr
-        self.assertEqual(packet_num, len(capture))
         for packet in capture:
             try:
                 self.assertEqual(packet[IP].src, src_ip)
@@ -599,18 +591,15 @@ class MethodHolder(VppTestCase):
                                       "(outside network):", packet))
                 raise
 
-    def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
-                                           icmp_type=11):
+    def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
         """
         Verify captured packets with ICMP errors on inside network
 
         :param capture: Captured packets
         :param in_if: Inside interface
-        :param packet_num: Expected number of packets (Default 3)
         :param icmp_type: Type of error ICMP packet
                           we are expecting (Default 11)
         """
-        self.assertEqual(packet_num, len(capture))
         for packet in capture:
             try:
                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
@@ -1236,7 +1225,6 @@ class TestNAT44(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.get_capture(len(pkts))
-        self.assertEqual(1, len(capture))
         packet = capture[0]
         try:
             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
@@ -1264,7 +1252,7 @@ class TestNAT44(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg0.get_capture(1)
-        self.verify_capture_in(capture, self.pg0, packet_num=1)
+        self.verify_capture_in(capture, self.pg0)
         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
 
         # in2out
@@ -1275,7 +1263,7 @@ class TestNAT44(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.get_capture(1)
-        self.verify_capture_out(capture, same_port=True, packet_num=1)
+        self.verify_capture_out(capture, same_port=True)
         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
 
     def test_forwarding(self):
@@ -3098,8 +3086,10 @@ class TestNAT44(MethodHolder):
         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
                                                   is_inside=0)
-        self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
-                      "psid-offset 6 psid-len 6")
+        self.vapi.nat_set_addr_and_port_alloc_alg(alg=1,
+                                                  psid_offset=6,
+                                                  psid_length=6,
+                                                  psid=10)
 
         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
@@ -3122,6 +3112,31 @@ class TestNAT44(MethodHolder):
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
 
+    def test_port_range(self):
+        """ External address port range """
+        self.nat44_add_address(self.nat_addr)
+        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+                                                  is_inside=0)
+        self.vapi.nat_set_addr_and_port_alloc_alg(alg=2,
+                                                  start_port=1025,
+                                                  end_port=1027)
+
+        pkts = []
+        for port in range(0, 5):
+            p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+                 TCP(sport=1125 + port))
+            pkts.append(p)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(3)
+        for p in capture:
+            tcp = p[TCP]
+            self.assertGreaterEqual(tcp.sport, 1025)
+            self.assertLessEqual(tcp.sport, 1027)
+
     def test_ipfix_max_frags(self):
         """ IPFIX logging maximum fragments pending reassembly exceeded """
         self.nat44_add_address(self.nat_addr)
@@ -3290,7 +3305,8 @@ class TestNAT44(MethodHolder):
             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
             self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
             self.logger.info(self.vapi.cli("show nat timeouts"))
-            self.vapi.cli("nat addr-port-assignment-alg default")
+            self.logger.info(
+                self.vapi.cli("show nat addr-port-assignment-alg"))
             self.clear_nat44()
             self.vapi.cli("clear logging")
 
@@ -3741,6 +3757,67 @@ class TestNAT44EndpointDependent(MethodHolder):
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
 
+    def test_lb_affinity(self):
+        """ NAT44 local service load balancing affinity """
+        external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
+        external_port = 80
+        local_port = 8080
+        server1 = self.pg0.remote_hosts[0]
+        server2 = self.pg0.remote_hosts[1]
+
+        locals = [{'addr': server1.ip4n,
+                   'port': local_port,
+                   'probability': 50,
+                   'vrf_id': 0},
+                  {'addr': server2.ip4n,
+                   'port': local_port,
+                   'probability': 50,
+                   'vrf_id': 0}]
+
+        self.nat44_add_address(self.nat_addr)
+        self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
+                                                  external_port,
+                                                  IP_PROTOS.tcp,
+                                                  affinity=10800,
+                                                  local_num=len(locals),
+                                                  locals=locals)
+        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+                                                  is_inside=0)
+
+        p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+             IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+             TCP(sport=1025, dport=external_port))
+        self.pg1.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(1)
+        backend = capture[0][IP].dst
+
+        sessions = self.vapi.nat44_user_session_dump(
+            socket.inet_pton(socket.AF_INET, backend), 0)
+        self.assertEqual(len(sessions), 1)
+        self.assertTrue(sessions[0].ext_host_valid)
+        self.vapi.nat44_del_session(
+            sessions[0].inside_ip_address,
+            sessions[0].inside_port,
+            sessions[0].protocol,
+            ext_host_address=sessions[0].ext_host_address,
+            ext_host_port=sessions[0].ext_host_port)
+
+        pkts = []
+        for port in range(1030, 1100):
+            p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+                 TCP(sport=port, dport=external_port))
+            pkts.append(p)
+        self.pg1.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg0.get_capture(len(pkts))
+        for p in capture:
+            self.assertEqual(p[IP].dst, backend)
+
     def test_unknown_proto(self):
         """ NAT44 translate packet with unknown protocol """
         self.nat44_add_address(self.nat_addr)
@@ -5302,18 +5379,16 @@ class TestDeterministicNAT(MethodHolder):
 
         return pkts
 
-    def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
+    def verify_capture_out(self, capture, nat_ip=None):
         """
         Verify captured packets on outside network
 
         :param capture: Captured packets
         :param nat_ip: Translated IP address (Default use global NAT address)
         :param same_port: Sorce port number is not translated (Default False)
-        :param packet_num: Expected number of packets (Default 3)
         """
         if nat_ip is None:
             nat_ip = self.nat_addr
-        self.assertEqual(packet_num, len(capture))
         for packet in capture:
             try:
                 self.assertEqual(packet[IP].src, nat_ip)
@@ -5877,7 +5952,6 @@ class TestNAT64(MethodHolder):
 
         # Wait for Neighbor Solicitation
         capture = self.pg5.get_capture(len(pkts))
-        self.assertEqual(1, len(capture))
         packet = capture[0]
         try:
             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
@@ -5905,7 +5979,6 @@ class TestNAT64(MethodHolder):
 
         # Wait for ping reply
         capture = self.pg5.get_capture(len(pkts))
-        self.assertEqual(1, len(capture))
         packet = capture[0]
         try:
             self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
@@ -7231,7 +7304,6 @@ class TestDSlite(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.get_capture(1)
-        self.assertEqual(1, len(capture))
         capture = capture[0]
         self.assertEqual(capture[IPv6].src, aftr_ip6)
         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
@@ -7338,7 +7410,6 @@ class TestDSliteCE(MethodHolder):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
         capture = self.pg1.get_capture(1)
-        self.assertEqual(1, len(capture))
         capture = capture[0]
         self.assertEqual(capture[IPv6].src, b4_ip6)
         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)