make test: improve test results printing
[vpp.git] / test / test_snat.py
index 5cc76f6..653496e 100644 (file)
@@ -2,12 +2,12 @@
 
 import socket
 import unittest
-from logging import *
 
 from framework import VppTestCase, VppTestRunner
 
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.l2 import Ether
+from util import ppp
 
 
 class TestSNAT(VppTestCase):
@@ -26,7 +26,7 @@ class TestSNAT(VppTestCase):
             cls.icmp_id_out = 6305
             cls.snat_addr = '10.0.0.3'
 
-            cls.create_pg_interfaces(range(7))
+            cls.create_pg_interfaces(range(8))
             cls.interfaces = list(cls.pg_interfaces[0:4])
 
             for i in cls.interfaces:
@@ -48,6 +48,8 @@ class TestSNAT(VppTestCase):
                 i.admin_up()
                 i.resolve_arp()
 
+            cls.pg7.admin_up()
+
         except Exception:
             super(TestSNAT, cls).tearDownClass()
             raise
@@ -88,7 +90,7 @@ class TestSNAT(VppTestCase):
         :param dst_ip: Destination IP address (Default use global SNAT address)
         """
         if dst_ip is None:
-             dst_ip=self.snat_addr
+            dst_ip = self.snat_addr
         pkts = []
         # TCP
         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
@@ -130,13 +132,15 @@ class TestSNAT(VppTestCase):
                     if same_port:
                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
                     else:
-                        self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
+                        self.assertNotEqual(
+                            packet[TCP].sport, self.tcp_port_in)
                     self.tcp_port_out = packet[TCP].sport
                 elif packet.haslayer(UDP):
                     if same_port:
                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
                     else:
-                        self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
+                        self.assertNotEqual(
+                            packet[UDP].sport, self.udp_port_in)
                     self.udp_port_out = packet[UDP].sport
                 else:
                     if same_port:
@@ -145,8 +149,8 @@ class TestSNAT(VppTestCase):
                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
                     self.icmp_id_out = packet[ICMP].id
             except:
-                error("Unexpected or invalid packet (outside network):")
-                error(packet.show())
+                self.logger.error(ppp("Unexpected or invalid packet "
+                                      "(outside network):", packet))
                 raise
 
     def verify_capture_in(self, capture, in_if, packet_num=3):
@@ -168,14 +172,18 @@ class TestSNAT(VppTestCase):
                 else:
                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
             except:
-                error("Unexpected or invalid packet (inside network):")
-                error(packet.show())
+                self.logger.error(ppp("Unexpected or invalid packet "
+                                      "(inside network):", packet))
                 raise
 
     def clear_snat(self):
         """
         Clear SNAT configuration.
         """
+        interfaces = self.vapi.snat_interface_addr_dump()
+        for intf in interfaces:
+            self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
+
         interfaces = self.vapi.snat_interface_dump()
         for intf in interfaces:
             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
@@ -215,8 +223,14 @@ class TestSNAT(VppTestCase):
             addr_only = 0
         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
-        self.vapi.snat_add_static_mapping(l_ip, e_ip, local_port, external_port,
-                                          addr_only, vrf_id, is_add)
+        self.vapi.snat_add_static_mapping(
+            l_ip,
+            e_ip,
+            local_port,
+            external_port,
+            addr_only,
+            vrf_id,
+            is_add)
 
     def snat_add_address(self, ip, is_add=1):
         """
@@ -241,7 +255,7 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg1.get_capture()
+        capture = self.pg1.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
         # out2in
@@ -249,7 +263,7 @@ class TestSNAT(VppTestCase):
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture()
+        capture = self.pg0.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg0)
 
     def test_static_in(self):
@@ -270,7 +284,7 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg1.get_capture()
+        capture = self.pg1.get_capture(len(pkts))
         self.verify_capture_out(capture, nat_ip, True)
 
         # out2in
@@ -278,7 +292,7 @@ class TestSNAT(VppTestCase):
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture()
+        capture = self.pg0.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg0)
 
     def test_static_out(self):
@@ -299,7 +313,7 @@ class TestSNAT(VppTestCase):
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture()
+        capture = self.pg0.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg0)
 
         # in2out
@@ -307,7 +321,7 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg1.get_capture()
+        capture = self.pg1.get_capture(len(pkts))
         self.verify_capture_out(capture, nat_ip, True)
 
     def test_static_with_port_in(self):
@@ -333,7 +347,7 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg1.get_capture()
+        capture = self.pg1.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
         # out2in
@@ -341,7 +355,7 @@ class TestSNAT(VppTestCase):
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture()
+        capture = self.pg0.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg0)
 
     def test_static_with_port_out(self):
@@ -367,7 +381,7 @@ class TestSNAT(VppTestCase):
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture()
+        capture = self.pg0.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg0)
 
         # in2out
@@ -375,7 +389,7 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg1.get_capture()
+        capture = self.pg1.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
     def test_static_vrf_aware(self):
@@ -401,7 +415,7 @@ class TestSNAT(VppTestCase):
         self.pg4.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg3.get_capture()
+        capture = self.pg3.get_capture(len(pkts))
         self.verify_capture_out(capture, nat_ip1, True)
 
         # inside interface VRF don't match SNAT static mapping VRF (packets
@@ -410,11 +424,12 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg3.get_capture()
-        self.verify_capture_out(capture, packet_num=0)
+        self.pg3.assert_nothing_captured()
 
     def test_multiple_inside_interfaces(self):
-        """ SNAT multiple inside interfaces with non-overlapping address space """
+        """
+        SNAT multiple inside interfaces with non-overlapping address space
+        """
 
         self.snat_add_address(self.snat_addr)
         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
@@ -428,7 +443,7 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg3.get_capture()
+        capture = self.pg3.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
         # out2in 1st interface
@@ -436,7 +451,7 @@ class TestSNAT(VppTestCase):
         self.pg3.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture()
+        capture = self.pg0.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg0)
 
         # in2out 2nd interface
@@ -444,7 +459,7 @@ class TestSNAT(VppTestCase):
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg3.get_capture()
+        capture = self.pg3.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
         # out2in 2nd interface
@@ -452,7 +467,7 @@ class TestSNAT(VppTestCase):
         self.pg3.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg1.get_capture()
+        capture = self.pg1.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg1)
 
         # in2out 3rd interface
@@ -460,7 +475,7 @@ class TestSNAT(VppTestCase):
         self.pg2.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg3.get_capture()
+        capture = self.pg3.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
         # out2in 3rd interface
@@ -468,7 +483,7 @@ class TestSNAT(VppTestCase):
         self.pg3.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg2.get_capture()
+        capture = self.pg2.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg2)
 
     def test_inside_overlapping_interfaces(self):
@@ -486,7 +501,7 @@ class TestSNAT(VppTestCase):
         self.pg4.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg3.get_capture()
+        capture = self.pg3.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
         # out2in 1st interface
@@ -494,7 +509,7 @@ class TestSNAT(VppTestCase):
         self.pg3.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg4.get_capture()
+        capture = self.pg4.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg4)
 
         # in2out 2nd interface
@@ -502,7 +517,7 @@ class TestSNAT(VppTestCase):
         self.pg5.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg3.get_capture()
+        capture = self.pg3.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
         # out2in 2nd interface
@@ -510,7 +525,7 @@ class TestSNAT(VppTestCase):
         self.pg3.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg5.get_capture()
+        capture = self.pg5.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg5)
 
         # in2out 3rd interface
@@ -518,7 +533,7 @@ class TestSNAT(VppTestCase):
         self.pg6.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg3.get_capture()
+        capture = self.pg3.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
         # out2in 3rd interface
@@ -526,7 +541,7 @@ class TestSNAT(VppTestCase):
         self.pg3.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg6.get_capture()
+        capture = self.pg6.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg6)
 
     def test_hairpinning(self):
@@ -554,8 +569,7 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture()
-        self.assertEqual(1, len(capture))
+        capture = self.pg0.get_capture(1)
         p = capture[0]
         try:
             ip = p[IP]
@@ -566,8 +580,7 @@ class TestSNAT(VppTestCase):
             self.assertEqual(tcp.dport, server_in_port)
             host_out_port = tcp.sport
         except:
-            error("Unexpected or invalid packet:")
-            error(p.show())
+            self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
 
         # send reply from server to host
@@ -577,8 +590,7 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture()
-        self.assertEqual(1, len(capture))
+        capture = self.pg0.get_capture(1)
         p = capture[0]
         try:
             ip = p[IP]
@@ -588,10 +600,52 @@ class TestSNAT(VppTestCase):
             self.assertEqual(tcp.sport, server_out_port)
             self.assertEqual(tcp.dport, host_in_port)
         except:
-            error("Unexpected or invalid packet:")
-            error(p.show())
+            self.logger.error(ppp("Unexpected or invalid packet:"), p)
             raise
 
+    def test_max_translations_per_user(self):
+        """ MAX translations per user - recycle the least recently used """
+
+        self.snat_add_address(self.snat_addr)
+        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+                                                 is_inside=0)
+
+        # get maximum number of translations per user
+        snat_config = self.vapi.snat_show_config()
+
+        # send more than maximum number of translations per user packets
+        pkts_num = snat_config.max_translations_per_user + 5
+        pkts = []
+        for port in range(0, pkts_num):
+            p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+                 TCP(sport=1025 + port))
+            pkts.append(p)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        # verify number of translated packet
+        self.pg1.get_capture(pkts_num)
+
+    def test_interface_addr(self):
+        """ Acquire SNAT addresses from interface """
+        self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
+
+        # no address in NAT pool
+        adresses = self.vapi.snat_address_dump()
+        self.assertEqual(0, len(adresses))
+
+        # configure interface address and check NAT address pool
+        self.pg7.config_ip4()
+        adresses = self.vapi.snat_address_dump()
+        self.assertEqual(1, len(adresses))
+
+        # remove interface address and check NAT address pool
+        self.pg7.unconfig_ip4()
+        adresses = self.vapi.snat_address_dump()
+        self.assertEqual(0, len(adresses))
 
     def tearDown(self):
         super(TestSNAT, self).tearDown()