tests: replace pycodestyle with black
[vpp.git] / test / test_mss_clamp.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6
7 from scapy.layers.inet import IP, TCP
8 from scapy.layers.inet6 import IPv6
9 from scapy.layers.l2 import Ether
10 from scapy.packet import Raw
11
12
13 class TestMSSClamp(VppTestCase):
14     """TCP MSS Clamping Test Case"""
15
16     def setUp(self):
17         super(TestMSSClamp, self).setUp()
18
19         # create 2 pg interfaces
20         self.create_pg_interfaces(range(2))
21
22         for i in self.pg_interfaces:
23             i.admin_up()
24             i.config_ip4()
25             i.resolve_arp()
26             i.config_ip6()
27             i.resolve_ndp()
28
29     def tearDown(self):
30         for i in self.pg_interfaces:
31             i.unconfig_ip4()
32             i.unconfig_ip6()
33             i.admin_down()
34         super(TestMSSClamp, self).tearDown()
35
36     def verify_pkt(self, rx, expected_mss):
37         # check that the MSS size equals the expected value
38         # and the IP and TCP checksums are correct
39         tcp = rx[TCP]
40         tcp_csum = tcp.chksum
41         del tcp.chksum
42         ip_csum = 0
43         if rx.haslayer(IP):
44             ip_csum = rx[IP].chksum
45             del rx[IP].chksum
46
47         opt = tcp.options
48         self.assertEqual(opt[0][0], "MSS")
49         self.assertEqual(opt[0][1], expected_mss)
50         # recalculate checksums
51         rx = rx.__class__(bytes(rx))
52         tcp = rx[TCP]
53         self.assertEqual(tcp_csum, tcp.chksum)
54         if rx.haslayer(IP):
55             self.assertEqual(ip_csum, rx[IP].chksum)
56
57     def send_and_verify_ip4(self, src_pg, dst_pg, mss, expected_mss):
58         # IPv4 TCP packet with the requested MSS option.
59         # from a host on src_pg to a host on dst_pg.
60         p = (
61             Ether(dst=src_pg.local_mac, src=src_pg.remote_mac)
62             / IP(src=src_pg.remote_ip4, dst=dst_pg.remote_ip4)
63             / TCP(
64                 sport=1234,
65                 dport=1234,
66                 flags="S",
67                 options=[("MSS", (mss)), ("EOL", None)],
68             )
69             / Raw("\xa5" * 100)
70         )
71
72         rxs = self.send_and_expect(src_pg, p * 65, dst_pg)
73
74         for rx in rxs:
75             self.verify_pkt(rx, expected_mss)
76
77     def send_and_verify_ip6(self, src_pg, dst_pg, mss, expected_mss):
78         #
79         # IPv6 TCP packet with the requested MSS option.
80         # from a host on src_pg to a host on dst_pg.
81         #
82         p = (
83             Ether(dst=src_pg.local_mac, src=src_pg.remote_mac)
84             / IPv6(src=src_pg.remote_ip6, dst=dst_pg.remote_ip6)
85             / TCP(
86                 sport=1234,
87                 dport=1234,
88                 flags="S",
89                 options=[("MSS", (mss)), ("EOL", None)],
90             )
91             / Raw("\xa5" * 100)
92         )
93
94         rxs = self.send_and_expect(src_pg, p * 65, dst_pg)
95
96         for rx in rxs:
97             self.verify_pkt(rx, expected_mss)
98
99     def test_tcp_mss_clamping_ip4_tx(self):
100         """IP4 TCP MSS Clamping TX"""
101
102         # enable the TCP MSS clamping feature to lower the MSS to 1424.
103         self.vapi.mss_clamp_enable_disable(
104             self.pg1.sw_if_index,
105             ipv4_mss=1424,
106             ipv6_mss=0,
107             ipv4_direction=3,
108             ipv6_direction=0,
109         )
110
111         # Verify that the feature is enabled.
112         rv, reply = self.vapi.mss_clamp_get(sw_if_index=self.pg1.sw_if_index)
113         self.assertEqual(reply[0].ipv4_mss, 1424)
114         self.assertEqual(reply[0].ipv4_direction, 3)
115
116         # Send syn packets and verify that the MSS value is lowered.
117         self.send_and_verify_ip4(self.pg0, self.pg1, 1460, 1424)
118
119         # check the stats
120         stats = self.statistics.get_counter("/err/tcp-mss-clamping-ip4-out/clamped")
121         self.assertEqual(sum(stats), 65)
122
123         # Send syn packets with small enough MSS values and verify they are
124         # unchanged.
125         self.send_and_verify_ip4(self.pg0, self.pg1, 1400, 1400)
126
127         # enable the the feature only in TX direction
128         # and change the max MSS value
129         self.vapi.mss_clamp_enable_disable(
130             self.pg1.sw_if_index,
131             ipv4_mss=1420,
132             ipv6_mss=0,
133             ipv4_direction=2,
134             ipv6_direction=0,
135         )
136
137         # Send syn packets and verify that the MSS value is lowered.
138         self.send_and_verify_ip4(self.pg0, self.pg1, 1460, 1420)
139
140         # enable the the feature only in RX direction
141         self.vapi.mss_clamp_enable_disable(
142             self.pg1.sw_if_index,
143             ipv4_mss=1424,
144             ipv6_mss=0,
145             ipv4_direction=1,
146             ipv6_direction=0,
147         )
148
149         # Send the packets again and ensure they are unchanged.
150         self.send_and_verify_ip4(self.pg0, self.pg1, 1460, 1460)
151
152         # disable the feature
153         self.vapi.mss_clamp_enable_disable(
154             self.pg1.sw_if_index,
155             ipv4_mss=0,
156             ipv6_mss=0,
157             ipv4_direction=0,
158             ipv6_direction=0,
159         )
160
161         # Send the packets again and ensure they are unchanged.
162         self.send_and_verify_ip4(self.pg0, self.pg1, 1460, 1460)
163
164     def test_tcp_mss_clamping_ip4_rx(self):
165         """IP4 TCP MSS Clamping RX"""
166
167         # enable the TCP MSS clamping feature to lower the MSS to 1424.
168         self.vapi.mss_clamp_enable_disable(
169             self.pg1.sw_if_index,
170             ipv4_mss=1424,
171             ipv6_mss=0,
172             ipv4_direction=3,
173             ipv6_direction=0,
174         )
175
176         # Verify that the feature is enabled.
177         rv, reply = self.vapi.mss_clamp_get(sw_if_index=self.pg1.sw_if_index)
178         self.assertEqual(reply[0].ipv4_mss, 1424)
179         self.assertEqual(reply[0].ipv4_direction, 3)
180
181         # Send syn packets and verify that the MSS value is lowered.
182         self.send_and_verify_ip4(self.pg1, self.pg0, 1460, 1424)
183
184         # check the stats
185         stats = self.statistics.get_counter("/err/tcp-mss-clamping-ip4-in/clamped")
186         self.assertEqual(sum(stats), 65)
187
188         # Send syn packets with small enough MSS values and verify they are
189         # unchanged.
190         self.send_and_verify_ip4(self.pg1, self.pg0, 1400, 1400)
191
192         # enable the the feature only in RX direction
193         # and change the max MSS value
194         self.vapi.mss_clamp_enable_disable(
195             self.pg1.sw_if_index,
196             ipv4_mss=1420,
197             ipv6_mss=0,
198             ipv4_direction=1,
199             ipv6_direction=0,
200         )
201
202         # Send syn packets and verify that the MSS value is lowered.
203         self.send_and_verify_ip4(self.pg1, self.pg0, 1460, 1420)
204
205         # enable the the feature only in TX direction
206         self.vapi.mss_clamp_enable_disable(
207             self.pg1.sw_if_index,
208             ipv4_mss=1424,
209             ipv6_mss=0,
210             ipv4_direction=2,
211             ipv6_direction=0,
212         )
213
214         # Send the packets again and ensure they are unchanged.
215         self.send_and_verify_ip4(self.pg1, self.pg0, 1460, 1460)
216
217         # disable the feature
218         self.vapi.mss_clamp_enable_disable(
219             self.pg1.sw_if_index,
220             ipv4_mss=0,
221             ipv6_mss=0,
222             ipv4_direction=0,
223             ipv6_direction=0,
224         )
225
226         # Send the packets again and ensure they are unchanged.
227         self.send_and_verify_ip4(self.pg1, self.pg0, 1460, 1460)
228
229     def test_tcp_mss_clamping_ip6_tx(self):
230         """IP6 TCP MSS Clamping TX"""
231
232         # enable the TCP MSS clamping feature to lower the MSS to 1424.
233         self.vapi.mss_clamp_enable_disable(
234             self.pg1.sw_if_index,
235             ipv4_mss=0,
236             ipv6_mss=1424,
237             ipv4_direction=0,
238             ipv6_direction=3,
239         )
240
241         # Verify that the feature is enabled.
242         rv, reply = self.vapi.mss_clamp_get(sw_if_index=self.pg1.sw_if_index)
243         self.assertEqual(reply[0].ipv6_mss, 1424)
244         self.assertEqual(reply[0].ipv6_direction, 3)
245
246         # Send syn packets and verify that the MSS value is lowered.
247         self.send_and_verify_ip6(self.pg0, self.pg1, 1460, 1424)
248
249         # check the stats
250         stats = self.statistics.get_counter("/err/tcp-mss-clamping-ip6-out/clamped")
251         self.assertEqual(sum(stats), 65)
252
253         # Send syn packets with small enough MSS values and verify they are
254         # unchanged.
255         self.send_and_verify_ip6(self.pg0, self.pg1, 1400, 1400)
256
257         # enable the the feature only in TX direction
258         # and change the max MSS value
259         self.vapi.mss_clamp_enable_disable(
260             self.pg1.sw_if_index,
261             ipv4_mss=0,
262             ipv6_mss=1420,
263             ipv4_direction=0,
264             ipv6_direction=2,
265         )
266
267         # Send syn packets and verify that the MSS value is lowered.
268         self.send_and_verify_ip6(self.pg0, self.pg1, 1460, 1420)
269
270         # enable the the feature only in RX direction
271         self.vapi.mss_clamp_enable_disable(
272             self.pg1.sw_if_index,
273             ipv4_mss=0,
274             ipv6_mss=1424,
275             ipv4_direction=0,
276             ipv6_direction=1,
277         )
278
279         # Send the packets again and ensure they are unchanged.
280         self.send_and_verify_ip6(self.pg0, self.pg1, 1460, 1460)
281
282         # disable the feature
283         self.vapi.mss_clamp_enable_disable(
284             self.pg1.sw_if_index,
285             ipv4_mss=0,
286             ipv6_mss=0,
287             ipv4_direction=0,
288             ipv6_direction=0,
289         )
290
291         # Send the packets again and ensure they are unchanged.
292         self.send_and_verify_ip6(self.pg0, self.pg1, 1460, 1460)
293
294     def test_tcp_mss_clamping_ip6_rx(self):
295         """IP6 TCP MSS Clamping RX"""
296
297         # enable the TCP MSS clamping feature to lower the MSS to 1424.
298         self.vapi.mss_clamp_enable_disable(
299             self.pg1.sw_if_index,
300             ipv4_mss=0,
301             ipv6_mss=1424,
302             ipv4_direction=0,
303             ipv6_direction=3,
304         )
305
306         # Verify that the feature is enabled.
307         rv, reply = self.vapi.mss_clamp_get(sw_if_index=self.pg1.sw_if_index)
308         self.assertEqual(reply[0].ipv6_mss, 1424)
309         self.assertEqual(reply[0].ipv6_direction, 3)
310
311         # Send syn packets and verify that the MSS value is lowered.
312         self.send_and_verify_ip6(self.pg1, self.pg0, 1460, 1424)
313
314         # check the stats
315         stats = self.statistics.get_counter("/err/tcp-mss-clamping-ip6-in/clamped")
316         self.assertEqual(sum(stats), 65)
317
318         # Send syn packets with small enough MSS values and verify they are
319         # unchanged.
320         self.send_and_verify_ip6(self.pg1, self.pg0, 1400, 1400)
321
322         # enable the the feature only in RX direction
323         # and change the max MSS value
324         self.vapi.mss_clamp_enable_disable(
325             self.pg1.sw_if_index,
326             ipv4_mss=0,
327             ipv6_mss=1420,
328             ipv4_direction=0,
329             ipv6_direction=1,
330         )
331
332         # Send syn packets and verify that the MSS value is lowered.
333         self.send_and_verify_ip6(self.pg1, self.pg0, 1460, 1420)
334
335         # enable the the feature only in TX direction
336         self.vapi.mss_clamp_enable_disable(
337             self.pg1.sw_if_index,
338             ipv4_mss=0,
339             ipv6_mss=1424,
340             ipv4_direction=0,
341             ipv6_direction=2,
342         )
343
344         # Send the packets again and ensure they are unchanged.
345         self.send_and_verify_ip6(self.pg1, self.pg0, 1460, 1460)
346
347         # disable the feature
348         self.vapi.mss_clamp_enable_disable(
349             self.pg1.sw_if_index,
350             ipv4_mss=0,
351             ipv6_mss=0,
352             ipv4_direction=0,
353             ipv6_direction=0,
354         )
355
356         # Send the packets again and ensure they are unchanged.
357         self.send_and_verify_ip6(self.pg1, self.pg0, 1460, 1460)
358
359
360 if __name__ == "__main__":
361     unittest.main(testRunner=VppTestRunner)