NAT: TCP MSS clamping
[vpp.git] / test / test_nat.py
index 5baadd8..a705bd9 100644 (file)
@@ -139,6 +139,7 @@ class MethodHolder(VppTestCase):
         self.verify_no_nat44_user()
         self.vapi.nat_set_timeouts()
         self.vapi.nat_set_addr_and_port_alloc_alg()
+        self.vapi.nat_set_mss_clamping()
 
     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
                                  local_port=0, external_port=0, vrf_id=0,
@@ -1002,6 +1003,21 @@ class MethodHolder(VppTestCase):
         # sourceIPv4Address
         self.assertEqual(src_addr, record[8])
 
+    def verify_mss_value(self, pkt, mss):
+        """
+        Verify TCP MSS value
+
+        :param pkt:
+        :param mss:
+        """
+        if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
+            raise TypeError("Not a TCP/IP packet")
+
+        for option in pkt[TCP].options:
+            if option[0] == 'MSS':
+                self.assertEqual(option[1], mss)
+                self.assert_tcp_checksum_valid(pkt)
+
 
 class TestNAT44(MethodHolder):
     """ NAT44 Test Cases """
@@ -3295,6 +3311,42 @@ class TestNAT44(MethodHolder):
             nsessions = nsessions + user.nsessions
         self.assertLess(nsessions, 2 * max_sessions)
 
+    def test_mss_clamping(self):
+        """ TCP MSS clamping """
+        self.nat44_add_address(self.nat_addr)
+        self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+        self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+                                                  is_inside=0)
+
+        p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+             IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+             TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+                 flags="S", options=[('MSS', 1400)]))
+
+        self.vapi.nat_set_mss_clamping(enable=1, mss_value=1000)
+        self.pg0.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(1)
+        # Negotiated MSS value greater than configured - changed
+        self.verify_mss_value(capture[0], 1000)
+
+        self.vapi.nat_set_mss_clamping(enable=0)
+        self.pg0.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(1)
+        # MSS clamping disabled - negotiated MSS unchanged
+        self.verify_mss_value(capture[0], 1400)
+
+        self.vapi.nat_set_mss_clamping(enable=1, mss_value=1500)
+        self.pg0.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        capture = self.pg1.get_capture(1)
+        # Negotiated MSS value smaller than configured - unchanged
+        self.verify_mss_value(capture[0], 1400)
+
     def tearDown(self):
         super(TestNAT44, self).tearDown()
         if not self.vpp_dead: