5 from framework import VppTestCase
6 from asfframework import VppTestRunner
8 from scapy.layers.inet import IP, TCP
9 from scapy.layers.inet6 import IPv6
10 from scapy.layers.l2 import Ether
11 from scapy.packet import Raw
14 class TestMSSClamp(VppTestCase):
15 """TCP MSS Clamping Test Case"""
18 super(TestMSSClamp, self).setUp()
20 # create 2 pg interfaces
21 self.create_pg_interfaces(range(2))
23 for i in self.pg_interfaces:
31 for i in self.pg_interfaces:
35 super(TestMSSClamp, self).tearDown()
37 def verify_pkt(self, rx, expected_mss):
38 # check that the MSS size equals the expected value
39 # and the IP and TCP checksums are correct
45 ip_csum = rx[IP].chksum
49 self.assertEqual(opt[0][0], "MSS")
50 self.assertEqual(opt[0][1], expected_mss)
51 # recalculate checksums
52 rx = rx.__class__(bytes(rx))
54 self.assertEqual(tcp_csum, tcp.chksum)
56 self.assertEqual(ip_csum, rx[IP].chksum)
58 def send_and_verify_ip4(self, src_pg, dst_pg, mss, expected_mss):
59 # IPv4 TCP packet with the requested MSS option.
60 # from a host on src_pg to a host on dst_pg.
62 Ether(dst=src_pg.local_mac, src=src_pg.remote_mac)
63 / IP(src=src_pg.remote_ip4, dst=dst_pg.remote_ip4)
68 options=[("MSS", (mss)), ("EOL", None)],
73 rxs = self.send_and_expect(src_pg, p * 65, dst_pg)
76 self.verify_pkt(rx, expected_mss)
78 def send_and_verify_ip6(self, src_pg, dst_pg, mss, expected_mss):
80 # IPv6 TCP packet with the requested MSS option.
81 # from a host on src_pg to a host on dst_pg.
84 Ether(dst=src_pg.local_mac, src=src_pg.remote_mac)
85 / IPv6(src=src_pg.remote_ip6, dst=dst_pg.remote_ip6)
90 options=[("MSS", (mss)), ("EOL", None)],
95 rxs = self.send_and_expect(src_pg, p * 65, dst_pg)
98 self.verify_pkt(rx, expected_mss)
100 def test_tcp_mss_clamping_ip4_tx(self):
101 """IP4 TCP MSS Clamping TX"""
103 # enable the TCP MSS clamping feature to lower the MSS to 1424.
104 self.vapi.mss_clamp_enable_disable(
105 self.pg1.sw_if_index,
112 # Verify that the feature is enabled.
113 rv, reply = self.vapi.mss_clamp_get(sw_if_index=self.pg1.sw_if_index)
114 self.assertEqual(reply[0].ipv4_mss, 1424)
115 self.assertEqual(reply[0].ipv4_direction, 3)
117 # Send syn packets and verify that the MSS value is lowered.
118 self.send_and_verify_ip4(self.pg0, self.pg1, 1460, 1424)
121 stats = self.statistics.get_counter("/err/tcp-mss-clamping-ip4-out/clamped")
122 self.assertEqual(sum(stats), 65)
124 # Send syn packets with small enough MSS values and verify they are
126 self.send_and_verify_ip4(self.pg0, self.pg1, 1400, 1400)
128 # enable the the feature only in TX direction
129 # and change the max MSS value
130 self.vapi.mss_clamp_enable_disable(
131 self.pg1.sw_if_index,
138 # Send syn packets and verify that the MSS value is lowered.
139 self.send_and_verify_ip4(self.pg0, self.pg1, 1460, 1420)
141 # enable the the feature only in RX direction
142 self.vapi.mss_clamp_enable_disable(
143 self.pg1.sw_if_index,
150 # Send the packets again and ensure they are unchanged.
151 self.send_and_verify_ip4(self.pg0, self.pg1, 1460, 1460)
153 # disable the feature
154 self.vapi.mss_clamp_enable_disable(
155 self.pg1.sw_if_index,
162 # Send the packets again and ensure they are unchanged.
163 self.send_and_verify_ip4(self.pg0, self.pg1, 1460, 1460)
165 def test_tcp_mss_clamping_ip4_rx(self):
166 """IP4 TCP MSS Clamping RX"""
168 # enable the TCP MSS clamping feature to lower the MSS to 1424.
169 self.vapi.mss_clamp_enable_disable(
170 self.pg1.sw_if_index,
177 # Verify that the feature is enabled.
178 rv, reply = self.vapi.mss_clamp_get(sw_if_index=self.pg1.sw_if_index)
179 self.assertEqual(reply[0].ipv4_mss, 1424)
180 self.assertEqual(reply[0].ipv4_direction, 3)
182 # Send syn packets and verify that the MSS value is lowered.
183 self.send_and_verify_ip4(self.pg1, self.pg0, 1460, 1424)
186 stats = self.statistics.get_counter("/err/tcp-mss-clamping-ip4-in/clamped")
187 self.assertEqual(sum(stats), 65)
189 # Send syn packets with small enough MSS values and verify they are
191 self.send_and_verify_ip4(self.pg1, self.pg0, 1400, 1400)
193 # enable the the feature only in RX direction
194 # and change the max MSS value
195 self.vapi.mss_clamp_enable_disable(
196 self.pg1.sw_if_index,
203 # Send syn packets and verify that the MSS value is lowered.
204 self.send_and_verify_ip4(self.pg1, self.pg0, 1460, 1420)
206 # enable the the feature only in TX direction
207 self.vapi.mss_clamp_enable_disable(
208 self.pg1.sw_if_index,
215 # Send the packets again and ensure they are unchanged.
216 self.send_and_verify_ip4(self.pg1, self.pg0, 1460, 1460)
218 # disable the feature
219 self.vapi.mss_clamp_enable_disable(
220 self.pg1.sw_if_index,
227 # Send the packets again and ensure they are unchanged.
228 self.send_and_verify_ip4(self.pg1, self.pg0, 1460, 1460)
230 def test_tcp_mss_clamping_ip6_tx(self):
231 """IP6 TCP MSS Clamping TX"""
233 # enable the TCP MSS clamping feature to lower the MSS to 1424.
234 self.vapi.mss_clamp_enable_disable(
235 self.pg1.sw_if_index,
242 # Verify that the feature is enabled.
243 rv, reply = self.vapi.mss_clamp_get(sw_if_index=self.pg1.sw_if_index)
244 self.assertEqual(reply[0].ipv6_mss, 1424)
245 self.assertEqual(reply[0].ipv6_direction, 3)
247 # Send syn packets and verify that the MSS value is lowered.
248 self.send_and_verify_ip6(self.pg0, self.pg1, 1460, 1424)
251 stats = self.statistics.get_counter("/err/tcp-mss-clamping-ip6-out/clamped")
252 self.assertEqual(sum(stats), 65)
254 # Send syn packets with small enough MSS values and verify they are
256 self.send_and_verify_ip6(self.pg0, self.pg1, 1400, 1400)
258 # enable the the feature only in TX direction
259 # and change the max MSS value
260 self.vapi.mss_clamp_enable_disable(
261 self.pg1.sw_if_index,
268 # Send syn packets and verify that the MSS value is lowered.
269 self.send_and_verify_ip6(self.pg0, self.pg1, 1460, 1420)
271 # enable the the feature only in RX direction
272 self.vapi.mss_clamp_enable_disable(
273 self.pg1.sw_if_index,
280 # Send the packets again and ensure they are unchanged.
281 self.send_and_verify_ip6(self.pg0, self.pg1, 1460, 1460)
283 # disable the feature
284 self.vapi.mss_clamp_enable_disable(
285 self.pg1.sw_if_index,
292 # Send the packets again and ensure they are unchanged.
293 self.send_and_verify_ip6(self.pg0, self.pg1, 1460, 1460)
295 def test_tcp_mss_clamping_ip6_rx(self):
296 """IP6 TCP MSS Clamping RX"""
298 # enable the TCP MSS clamping feature to lower the MSS to 1424.
299 self.vapi.mss_clamp_enable_disable(
300 self.pg1.sw_if_index,
307 # Verify that the feature is enabled.
308 rv, reply = self.vapi.mss_clamp_get(sw_if_index=self.pg1.sw_if_index)
309 self.assertEqual(reply[0].ipv6_mss, 1424)
310 self.assertEqual(reply[0].ipv6_direction, 3)
312 # Send syn packets and verify that the MSS value is lowered.
313 self.send_and_verify_ip6(self.pg1, self.pg0, 1460, 1424)
316 stats = self.statistics.get_counter("/err/tcp-mss-clamping-ip6-in/clamped")
317 self.assertEqual(sum(stats), 65)
319 # Send syn packets with small enough MSS values and verify they are
321 self.send_and_verify_ip6(self.pg1, self.pg0, 1400, 1400)
323 # enable the the feature only in RX direction
324 # and change the max MSS value
325 self.vapi.mss_clamp_enable_disable(
326 self.pg1.sw_if_index,
333 # Send syn packets and verify that the MSS value is lowered.
334 self.send_and_verify_ip6(self.pg1, self.pg0, 1460, 1420)
336 # enable the the feature only in TX direction
337 self.vapi.mss_clamp_enable_disable(
338 self.pg1.sw_if_index,
345 # Send the packets again and ensure they are unchanged.
346 self.send_and_verify_ip6(self.pg1, self.pg0, 1460, 1460)
348 # disable the feature
349 self.vapi.mss_clamp_enable_disable(
350 self.pg1.sw_if_index,
357 # Send the packets again and ensure they are unchanged.
358 self.send_and_verify_ip6(self.pg1, self.pg0, 1460, 1460)
361 if __name__ == "__main__":
362 unittest.main(testRunner=VppTestRunner)