http: ignore http_proxy env in tests
[vpp.git] / test / test_mtu.py
1 #!/usr/bin/env python3
2 """IP4 and IP6 MTU functional tests"""
3
4 #
5 # Add tests for:
6 # - sub interfaces
7 # - Verify that adjacencies inherit MTU correctly
8 # - Verify that sub-interfaces inherit MTU correctly
9 # - Different types of interfaces?
10 #
11 import unittest
12 from scapy.layers.inet6 import IPv6, Ether, IP, UDP, ICMPv6PacketTooBig
13 from scapy.layers.inet import ICMP
14 from framework import VppTestCase
15 from asfframework import VppTestRunner
16 from util import reassemble4
17
18
19 """ Test_mtu is a subclass of VPPTestCase classes.
20     MTU tests.
21 """
22
23
24 class TestMTU(VppTestCase):
25     """MTU Test Case"""
26
27     maxDiff = None
28
29     @classmethod
30     def setUpClass(cls):
31         super(TestMTU, cls).setUpClass()
32         cls.create_pg_interfaces(range(2))
33         cls.interfaces = list(cls.pg_interfaces)
34
35     @classmethod
36     def tearDownClass(cls):
37         super(TestMTU, cls).tearDownClass()
38
39     def setUp(self):
40         super(TestMTU, self).setUp()
41         for i in self.interfaces:
42             i.admin_up()
43             i.config_ip4()
44             i.config_ip6()
45             i.disable_ipv6_ra()
46             i.resolve_arp()
47             i.resolve_ndp()
48
49     def tearDown(self):
50         super(TestMTU, self).tearDown()
51         if not self.vpp_dead:
52             for i in self.pg_interfaces:
53                 i.unconfig_ip4()
54                 i.unconfig_ip6()
55                 i.admin_down()
56
57     def validate(self, rx, expected):
58         self.assertEqual(rx, expected.__class__(expected))
59
60     def validate_bytes(self, rx, expected):
61         self.assertEqual(rx, expected)
62
63     def payload(self, len):
64         return "x" * len
65
66     def get_mtu(self, sw_if_index):
67         rv = self.vapi.sw_interface_dump(sw_if_index=sw_if_index)
68         for i in rv:
69             if i.sw_if_index == sw_if_index:
70                 return i.mtu[0]
71         return 0
72
73     def test_ip4_mtu(self):
74         """IP4 MTU test"""
75
76         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
77         p_ip4 = IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, flags="DF")
78
79         current_mtu = self.get_mtu(self.pg1.sw_if_index)
80
81         p_payload = UDP(sport=1234, dport=1234) / self.payload(current_mtu - 20 - 8)
82
83         p4 = p_ether / p_ip4 / p_payload
84         p4_reply = p_ip4 / p_payload
85         p4_reply.ttl -= 1
86         rx = self.send_and_expect(self.pg0, p4 * 11, self.pg1)
87         for p in rx:
88             self.validate(p[1], p4_reply)
89
90         # MTU
91         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [576, 0, 0, 0])
92         self.assertEqual(576, self.get_mtu(self.pg1.sw_if_index))
93
94         # Should fail. Too large MTU
95         p_icmp4 = ICMP(
96             type="dest-unreach",
97             code="fragmentation-needed",
98             nexthopmtu=576,
99             chksum=0x2DBB,
100         )
101         icmp4_reply = (
102             IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4, ttl=255, len=576, id=0)
103             / p_icmp4
104             / p_ip4
105             / p_payload
106         )
107         n = icmp4_reply.__class__(icmp4_reply)
108         s = bytes(icmp4_reply)
109         icmp4_reply = s[0:576]
110         rx = self.send_and_expect_some(self.pg0, p4 * 11, self.pg0)
111         for p in rx:
112             # p.show2()
113             # n.show2()
114             self.validate_bytes(bytes(p[1]), icmp4_reply)
115
116         self.assert_error_counter_equal("/err/ip4-input/mtu_exceeded", 11)
117
118         # Now with DF off. Expect fragments.
119         # First go with 1500 byte packets.
120         p_payload = UDP(sport=1234, dport=1234) / self.payload(1500 - 20 - 8)
121         p4 = p_ether / p_ip4 / p_payload
122         p4.flags = 0
123         p4_reply = p_ip4 / p_payload
124         p4_reply.ttl = p_ip4.ttl - 1
125         p4_reply.flags = 0
126         p4_reply.id = 256
127         self.pg_enable_capture()
128         self.pg0.add_stream(p4 * 1)
129         self.pg_start()
130         rx = self.pg1.get_capture(3)
131         reass_pkt = reassemble4(rx)
132         self.validate(reass_pkt, p4_reply)
133
134         """
135         # Now what happens with a 9K frame
136         p_payload = UDP(sport=1234, dport=1234) / self.payload(
137             current_mtu - 20 - 8)
138         p4 = p_ether / p_ip4 / p_payload
139         p4.flags = 0
140         p4_reply = p_ip4 / p_payload
141         p4_reply.ttl = 62 # check this
142         p4_reply.flags = 0
143         p4_reply.id = 512
144
145         self.pg_enable_capture()
146         self.pg0.add_stream(p4*1)
147         self.pg_start()
148         rx = self.pg1.get_capture(16)
149         reass_pkt = reassemble4(rx)
150         reass_pkt.show2()
151         p4_reply.show2()
152         self.validate(reass_pkt, p4_reply)
153         """
154
155         # Reset MTU
156         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [current_mtu, 0, 0, 0])
157
158     def test_ip6_mtu(self):
159         """IP6 MTU test"""
160
161         current_mtu = self.get_mtu(self.pg1.sw_if_index)
162
163         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
164         p_ip6 = IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
165
166         p_payload = UDP(sport=1234, dport=1234) / self.payload(current_mtu - 40 - 8)
167
168         p6 = p_ether / p_ip6 / p_payload
169         p6_reply = p_ip6 / p_payload
170         p6_reply.hlim -= 1
171         rx = self.send_and_expect(self.pg0, p6 * 9, self.pg1)
172         for p in rx:
173             self.validate(p[1], p6_reply)
174
175         # MTU (only checked on encap)
176         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1280, 0, 0, 0])
177         self.assertEqual(1280, self.get_mtu(self.pg1.sw_if_index))
178
179         # Should fail. Too large MTU
180         p_icmp6 = ICMPv6PacketTooBig(mtu=1280, cksum=0x4C7A)
181         icmp6_reply = (
182             IPv6(src=self.pg0.local_ip6, dst=self.pg0.remote_ip6, hlim=255, plen=1240)
183             / p_icmp6
184             / p_ip6
185             / p_payload
186         )
187         icmp6_reply[2].hlim -= 1
188         n = icmp6_reply.__class__(icmp6_reply)
189         s = bytes(icmp6_reply)
190         icmp6_reply_str = s[0:1280]
191
192         rx = self.send_and_expect_some(self.pg0, p6 * 9, self.pg0)
193         for p in rx:
194             self.validate_bytes(bytes(p[1]), icmp6_reply_str)
195
196         self.assert_error_counter_equal("/err/ip6-input/mtu_exceeded", 9)
197
198         # Reset MTU
199         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [current_mtu, 0, 0, 0])
200
201
202 if __name__ == "__main__":
203     unittest.main(testRunner=VppTestRunner)