tests: replace pycodestyle with black
[vpp.git] / test / test_gro.py
1 #!/usr/bin/env python3
2 """GRO functional tests"""
3
4 #
5 # Add tests for:
6 # - GRO
7 # - Verify that sending 1500 Bytes frame without GRO enabled correctly
8 # - Verify that sending 1500 Bytes frame with GRO enabled correctly
9 #
10 import unittest
11
12 from scapy.packet import Raw
13 from scapy.layers.inet6 import IPv6, Ether, IP, UDP, ICMPv6PacketTooBig
14 from scapy.layers.inet6 import ipv6nh, IPerror6
15 from scapy.layers.inet import TCP, ICMP
16 from scapy.data import ETH_P_IP, ETH_P_IPV6, ETH_P_ARP
17
18 from framework import VppTestCase, VppTestRunner
19 from vpp_object import VppObject
20 from vpp_interface import VppInterface
21
22
23 """ Test_gro is a subclass of VPPTestCase classes.
24     GRO tests.
25 """
26
27
28 class TestGRO(VppTestCase):
29     """GRO Test Case"""
30
31     @classmethod
32     def setUpClass(self):
33         super(TestGRO, self).setUpClass()
34         res = self.create_pg_interfaces(range(2))
35         res_gro = self.create_pg_interfaces(range(2, 3), 1, 1460)
36         self.create_pg_interfaces(range(3, 4), 1, 8940)
37         self.pg_interfaces.append(res[0])
38         self.pg_interfaces.append(res[1])
39         self.pg_interfaces.append(res_gro[0])
40         self.pg2.coalesce_enable()
41         self.pg3.coalesce_enable()
42
43     @classmethod
44     def tearDownClass(self):
45         super(TestGRO, self).tearDownClass()
46
47     def setUp(self):
48         super(TestGRO, self).setUp()
49         for i in self.pg_interfaces:
50             i.admin_up()
51             i.config_ip4()
52             i.config_ip6()
53             i.disable_ipv6_ra()
54             i.resolve_arp()
55             i.resolve_ndp()
56
57     def tearDown(self):
58         super(TestGRO, self).tearDown()
59         if not self.vpp_dead:
60             for i in self.pg_interfaces:
61                 i.unconfig_ip4()
62                 i.unconfig_ip6()
63                 i.admin_down()
64
65     def test_gro(self):
66         """GRO test"""
67
68         n_packets = 124
69         #
70         # Send 1500 bytes frame with gro disabled
71         #
72         p4 = (
73             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
74             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, flags="DF")
75             / TCP(sport=1234, dport=4321)
76             / Raw(b"\xa5" * 1460)
77         )
78
79         rxs = self.send_and_expect(self.pg0, n_packets * p4, self.pg1)
80         for rx in rxs:
81             self.assertEqual(rx[Ether].src, self.pg1.local_mac)
82             self.assertEqual(rx[Ether].dst, self.pg1.remote_mac)
83             self.assertEqual(rx[IP].src, self.pg0.remote_ip4)
84             self.assertEqual(rx[IP].dst, self.pg1.remote_ip4)
85             self.assertEqual(rx[TCP].sport, 1234)
86             self.assertEqual(rx[TCP].dport, 4321)
87
88         #
89         # Send 1500 bytes frame with gro enabled on
90         # output interfaces support GRO
91         #
92         p = []
93         s = 0
94         for n in range(0, n_packets):
95             p.append(
96                 (
97                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
98                     / IP(src=self.pg0.remote_ip4, dst=self.pg2.remote_ip4, flags="DF")
99                     / TCP(sport=1234, dport=4321, seq=s, ack=n, flags="A")
100                     / Raw(b"\xa5" * 1460)
101                 )
102             )
103             s += 1460
104
105         rxs = self.send_and_expect(self.pg0, p, self.pg2, n_rx=2)
106
107         i = 0
108         for rx in rxs:
109             i += 1
110             self.assertEqual(rx[Ether].src, self.pg2.local_mac)
111             self.assertEqual(rx[Ether].dst, self.pg2.remote_mac)
112             self.assertEqual(rx[IP].src, self.pg0.remote_ip4)
113             self.assertEqual(rx[IP].dst, self.pg2.remote_ip4)
114             self.assertEqual(rx[IP].len, 64280)  # 1460 * 44 + 40 < 65536
115             self.assertEqual(rx[TCP].sport, 1234)
116             self.assertEqual(rx[TCP].dport, 4321)
117             self.assertEqual(rx[TCP].ack, (44 * i - 1))
118
119         p4_temp = (
120             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
121             / IP(src=self.pg2.remote_ip4, dst=self.pg0.remote_ip4, flags="DF")
122             / TCP(sport=1234, dport=4321, flags="F")
123         )
124
125         rxs = self.send_and_expect(self.pg2, 100 * [p4_temp], self.pg0, n_rx=100)
126         rx_coalesce = self.pg2.get_capture(1, timeout=1)
127
128         rx0 = rx_coalesce[0]
129         self.assertEqual(rx0[Ether].src, self.pg2.local_mac)
130         self.assertEqual(rx0[Ether].dst, self.pg2.remote_mac)
131         self.assertEqual(rx0[IP].src, self.pg0.remote_ip4)
132         self.assertEqual(rx0[IP].dst, self.pg2.remote_ip4)
133         self.assertEqual(rx0[IP].len, 52600)  # 1460 * 36 + 40
134         self.assertEqual(rx0[TCP].sport, 1234)
135         self.assertEqual(rx0[TCP].dport, 4321)
136
137         for rx in rxs:
138             self.assertEqual(rx[Ether].src, self.pg0.local_mac)
139             self.assertEqual(rx[Ether].dst, self.pg0.remote_mac)
140             self.assertEqual(rx[IP].src, self.pg2.remote_ip4)
141             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
142             self.assertEqual(rx[IP].len, 40)
143             self.assertEqual(rx[TCP].sport, 1234)
144             self.assertEqual(rx[TCP].dport, 4321)
145
146         #
147         # Same test with IPv6
148         #
149         p = []
150         s = 0
151         for n in range(0, 88):
152             p.append(
153                 (
154                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
155                     / IPv6(src=self.pg0.remote_ip6, dst=self.pg2.remote_ip6)
156                     / TCP(sport=1234, dport=4321, seq=s, ack=n, flags="A")
157                     / Raw(b"\xa5" * 1460)
158                 )
159             )
160             s += 1460
161         p[-1][TCP].flags = "AP"  # push to flush second packet
162
163         rxs = self.send_and_expect(self.pg0, p, self.pg2, n_rx=2)
164
165         i = 0
166         for rx in rxs:
167             i += 1
168             self.assertEqual(rx[Ether].src, self.pg2.local_mac)
169             self.assertEqual(rx[Ether].dst, self.pg2.remote_mac)
170             self.assertEqual(rx[IPv6].src, self.pg0.remote_ip6)
171             self.assertEqual(rx[IPv6].dst, self.pg2.remote_ip6)
172             self.assertEqual(rx[IPv6].plen, 64260)  # 1460 * 44 + 20 < 65536
173             self.assertEqual(rx[TCP].sport, 1234)
174             self.assertEqual(rx[TCP].dport, 4321)
175             self.assertEqual(rx[TCP].ack, (44 * i - 1))
176
177         #
178         # Send a series of 1500 bytes packets each followed by a packet with a
179         # PSH flag. Verify that GRO stops everytime a PSH flag is encountered
180         #
181         p = []
182         s = 0
183         for n in range(0, n_packets):
184             p.append(
185                 (
186                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
187                     / IP(src=self.pg0.remote_ip4, dst=self.pg2.remote_ip4, flags="DF")
188                     / TCP(sport=1234, dport=4321, seq=s, ack=2 * n, flags="A")
189                     / Raw(b"\xa5" * 1460)
190                 )
191             )
192             s += 1460
193             p.append(
194                 (
195                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
196                     / IP(src=self.pg0.remote_ip4, dst=self.pg2.remote_ip4, flags="DF")
197                     / TCP(sport=1234, dport=4321, seq=s, ack=2 * n + 1, flags="AP")
198                     / Raw(b"\xa5" * 1340)
199                 )
200             )
201             s += 1340
202
203         rxs = self.send_and_expect(self.pg0, p, self.pg2, n_rx=n_packets)
204
205         i = 0
206         for rx in rxs:
207             self.assertEqual(rx[Ether].src, self.pg2.local_mac)
208             self.assertEqual(rx[Ether].dst, self.pg2.remote_mac)
209             self.assertEqual(rx[IP].src, self.pg0.remote_ip4)
210             self.assertEqual(rx[IP].dst, self.pg2.remote_ip4)
211             self.assertEqual(rx[IP].len, 40 + 1460 + 1340)
212             self.assertEqual(rx[TCP].sport, 1234)
213             self.assertEqual(rx[TCP].dport, 4321)
214             self.assertEqual(rx[TCP].ack, (2 * i + 1))
215             i += 1
216
217         #
218         # Send a series of 1500 bytes packets each followed by a short packet
219         # with padding. Verify that GRO removes the padding and stops on short
220         # packets
221         #
222         p = []
223         s = 0
224         for n in range(0, n_packets):
225             i = self.pg0
226             p.append(
227                 (
228                     Ether(src=i.remote_mac, dst=i.local_mac)
229                     / IP(src=i.remote_ip4, dst=self.pg2.remote_ip4, flags="DF")
230                     / TCP(sport=1234, dport=4321, seq=s, ack=2 * n, flags="A")
231                     / Raw(b"\xa5" * 1459)
232                 )
233             )
234             s += 1459
235             p2 = (
236                 Ether(src=i.remote_mac, dst=i.local_mac)
237                 / IP(src=i.remote_ip4, dst=self.pg2.remote_ip4, flags="DF", len=41)
238                 / TCP(sport=1234, dport=4321, seq=s, ack=2 * n + 1, flags="A")
239                 / Raw(b"\xa5")
240             )
241             # first compute csum of pkt w/o padding to work around scapy bug
242             p2 = Ether(bytes(p2))
243             p.append(p2 / Raw(b"\xa5" * 5))  # 1 byte data + 5 bytes padding
244             s += 1
245
246         rxs = self.send_and_expect(self.pg0, p, self.pg2, n_rx=n_packets)
247
248         i = 0
249         for rx in rxs:
250             self.assertEqual(rx[Ether].src, self.pg2.local_mac)
251             self.assertEqual(rx[Ether].dst, self.pg2.remote_mac)
252             self.assertEqual(rx[IP].src, self.pg0.remote_ip4)
253             self.assertEqual(rx[IP].dst, self.pg2.remote_ip4)
254             self.assertEqual(rx[IP].len, 40 + 1459 + 1)
255             self.assertEqual(rx[TCP].sport, 1234)
256             self.assertEqual(rx[TCP].dport, 4321)
257             self.assertEqual(rx[TCP].ack, (2 * i + 1))
258             i += 1
259
260
261 if __name__ == "__main__":
262     unittest.main(testRunner=VppTestRunner)