vxlan unit test - minor fixes
[vpp.git] / test / test_snat.py
index 5cc76f6..ca2e52a 100644 (file)
@@ -2,12 +2,12 @@
 
 import socket
 import unittest
-from logging import *
 
 from framework import VppTestCase, VppTestRunner
 
 from scapy.layers.inet import IP, TCP, UDP, ICMP
 from scapy.layers.l2 import Ether
+from util import ppp
 
 
 class TestSNAT(VppTestCase):
@@ -88,7 +88,7 @@ class TestSNAT(VppTestCase):
         :param dst_ip: Destination IP address (Default use global SNAT address)
         """
         if dst_ip is None:
-             dst_ip=self.snat_addr
+            dst_ip = self.snat_addr
         pkts = []
         # TCP
         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
@@ -145,8 +145,8 @@ class TestSNAT(VppTestCase):
                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
                     self.icmp_id_out = packet[ICMP].id
             except:
-                error("Unexpected or invalid packet (outside network):")
-                error(packet.show())
+                self.logger.error(ppp("Unexpected or invalid packet "
+                                      "(outside network):", packet))
                 raise
 
     def verify_capture_in(self, capture, in_if, packet_num=3):
@@ -168,8 +168,8 @@ class TestSNAT(VppTestCase):
                 else:
                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
             except:
-                error("Unexpected or invalid packet (inside network):")
-                error(packet.show())
+                self.logger.error(ppp("Unexpected or invalid packet "
+                                      "(inside network):", packet))
                 raise
 
     def clear_snat(self):
@@ -241,7 +241,7 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg1.get_capture()
+        capture = self.pg1.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
         # out2in
@@ -249,7 +249,7 @@ class TestSNAT(VppTestCase):
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture()
+        capture = self.pg0.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg0)
 
     def test_static_in(self):
@@ -270,7 +270,7 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg1.get_capture()
+        capture = self.pg1.get_capture(len(pkts))
         self.verify_capture_out(capture, nat_ip, True)
 
         # out2in
@@ -278,7 +278,7 @@ class TestSNAT(VppTestCase):
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture()
+        capture = self.pg0.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg0)
 
     def test_static_out(self):
@@ -299,7 +299,7 @@ class TestSNAT(VppTestCase):
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture()
+        capture = self.pg0.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg0)
 
         # in2out
@@ -307,7 +307,7 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg1.get_capture()
+        capture = self.pg1.get_capture(len(pkts))
         self.verify_capture_out(capture, nat_ip, True)
 
     def test_static_with_port_in(self):
@@ -333,7 +333,7 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg1.get_capture()
+        capture = self.pg1.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
         # out2in
@@ -341,7 +341,7 @@ class TestSNAT(VppTestCase):
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture()
+        capture = self.pg0.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg0)
 
     def test_static_with_port_out(self):
@@ -367,7 +367,7 @@ class TestSNAT(VppTestCase):
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture()
+        capture = self.pg0.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg0)
 
         # in2out
@@ -375,7 +375,7 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg1.get_capture()
+        capture = self.pg1.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
     def test_static_vrf_aware(self):
@@ -401,7 +401,7 @@ class TestSNAT(VppTestCase):
         self.pg4.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg3.get_capture()
+        capture = self.pg3.get_capture(len(pkts))
         self.verify_capture_out(capture, nat_ip1, True)
 
         # inside interface VRF don't match SNAT static mapping VRF (packets
@@ -410,11 +410,10 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg3.get_capture()
-        self.verify_capture_out(capture, packet_num=0)
+        self.pg3.assert_nothing_captured()
 
     def test_multiple_inside_interfaces(self):
-        """ SNAT multiple inside interfaces with non-overlapping address space """
+        """SNAT multiple inside interfaces with non-overlapping address space"""
 
         self.snat_add_address(self.snat_addr)
         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
@@ -428,7 +427,7 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg3.get_capture()
+        capture = self.pg3.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
         # out2in 1st interface
@@ -436,7 +435,7 @@ class TestSNAT(VppTestCase):
         self.pg3.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture()
+        capture = self.pg0.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg0)
 
         # in2out 2nd interface
@@ -444,7 +443,7 @@ class TestSNAT(VppTestCase):
         self.pg1.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg3.get_capture()
+        capture = self.pg3.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
         # out2in 2nd interface
@@ -452,7 +451,7 @@ class TestSNAT(VppTestCase):
         self.pg3.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg1.get_capture()
+        capture = self.pg1.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg1)
 
         # in2out 3rd interface
@@ -460,7 +459,7 @@ class TestSNAT(VppTestCase):
         self.pg2.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg3.get_capture()
+        capture = self.pg3.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
         # out2in 3rd interface
@@ -468,7 +467,7 @@ class TestSNAT(VppTestCase):
         self.pg3.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg2.get_capture()
+        capture = self.pg2.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg2)
 
     def test_inside_overlapping_interfaces(self):
@@ -486,7 +485,7 @@ class TestSNAT(VppTestCase):
         self.pg4.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg3.get_capture()
+        capture = self.pg3.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
         # out2in 1st interface
@@ -494,7 +493,7 @@ class TestSNAT(VppTestCase):
         self.pg3.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg4.get_capture()
+        capture = self.pg4.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg4)
 
         # in2out 2nd interface
@@ -502,7 +501,7 @@ class TestSNAT(VppTestCase):
         self.pg5.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg3.get_capture()
+        capture = self.pg3.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
         # out2in 2nd interface
@@ -510,7 +509,7 @@ class TestSNAT(VppTestCase):
         self.pg3.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg5.get_capture()
+        capture = self.pg5.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg5)
 
         # in2out 3rd interface
@@ -518,7 +517,7 @@ class TestSNAT(VppTestCase):
         self.pg6.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg3.get_capture()
+        capture = self.pg3.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
         # out2in 3rd interface
@@ -526,7 +525,7 @@ class TestSNAT(VppTestCase):
         self.pg3.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg6.get_capture()
+        capture = self.pg6.get_capture(len(pkts))
         self.verify_capture_in(capture, self.pg6)
 
     def test_hairpinning(self):
@@ -554,8 +553,7 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture()
-        self.assertEqual(1, len(capture))
+        capture = self.pg0.get_capture(1)
         p = capture[0]
         try:
             ip = p[IP]
@@ -566,8 +564,7 @@ class TestSNAT(VppTestCase):
             self.assertEqual(tcp.dport, server_in_port)
             host_out_port = tcp.sport
         except:
-            error("Unexpected or invalid packet:")
-            error(p.show())
+            self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
 
         # send reply from server to host
@@ -577,8 +574,7 @@ class TestSNAT(VppTestCase):
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
-        capture = self.pg0.get_capture()
-        self.assertEqual(1, len(capture))
+        capture = self.pg0.get_capture(1)
         p = capture[0]
         try:
             ip = p[IP]
@@ -588,10 +584,34 @@ class TestSNAT(VppTestCase):
             self.assertEqual(tcp.sport, server_out_port)
             self.assertEqual(tcp.dport, host_in_port)
         except:
-            error("Unexpected or invalid packet:")
-            error(p.show())
+            self.logger.error(ppp("Unexpected or invalid packet:"), p)
             raise
 
+    def test_max_translations_per_user(self):
+        """ MAX translations per user - recycle the least recently used """
+
+        self.snat_add_address(self.snat_addr)
+        self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+                                                 is_inside=0)
+
+        # get maximum number of translations per user
+        snat_config = self.vapi.snat_show_config()
+
+        # send more than maximum number of translations per user packets
+        pkts_num = snat_config.max_translations_per_user + 5
+        pkts = []
+        for port in range(0, pkts_num):
+            p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+                 TCP(sport=1025 + port))
+            pkts.append(p)
+        self.pg0.add_stream(pkts)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        # verify number of translated packet
+        self.pg1.get_capture(pkts_num)
 
     def tearDown(self):
         super(TestSNAT, self).tearDown()