tls: propagate transport closed notifications
[vpp.git] / test / test_mtu.py
1 #!/usr/bin/env python3
2 """IP4 and IP6 MTU functional tests"""
3
4 #
5 # Add tests for:
6 # - sub interfaces
7 # - Verify that adjacencies inherit MTU correctly
8 # - Verify that sub-interfaces inherit MTU correctly
9 # - Different types of interfaces?
10 #
11 import unittest
12 from scapy.layers.inet6 import IPv6, Ether, IP, UDP, ICMPv6PacketTooBig
13 from scapy.layers.inet import ICMP
14 from framework import VppTestCase
15 from asfframework import VppTestRunner
16 from util import reassemble4
17
18
19 """ Test_mtu is a subclass of VPPTestCase classes.
20     MTU tests.
21 """
22
23
24 class TestMTU(VppTestCase):
25     """MTU Test Case"""
26
27     maxDiff = None
28
29     @classmethod
30     def setUpClass(cls):
31         super(TestMTU, cls).setUpClass()
32         cls.create_pg_interfaces(range(2))
33         cls.interfaces = list(cls.pg_interfaces)
34
35     @classmethod
36     def tearDownClass(cls):
37         super(TestMTU, cls).tearDownClass()
38
39     def setUp(self):
40         super(TestMTU, self).setUp()
41         for i in self.interfaces:
42             i.admin_up()
43             i.config_ip4()
44             i.config_ip6()
45             i.disable_ipv6_ra()
46             i.resolve_arp()
47             i.resolve_ndp()
48
49     def tearDown(self):
50         super(TestMTU, self).tearDown()
51         if not self.vpp_dead:
52             for i in self.pg_interfaces:
53                 i.unconfig_ip4()
54                 i.unconfig_ip6()
55                 i.admin_down()
56
57     def validate(self, rx, expected):
58         self.assertEqual(rx, expected.__class__(expected))
59
60     def validate_bytes(self, rx, expected):
61         self.assertEqual(rx, expected)
62
63     def payload(self, len):
64         return "x" * len
65
66     def get_mtu(self, sw_if_index):
67         rv = self.vapi.sw_interface_dump(sw_if_index=sw_if_index)
68         for i in rv:
69             if i.sw_if_index == sw_if_index:
70                 return i.mtu[0]
71         return 0
72
73     def test_ip4_mtu(self):
74         """IP4 MTU test"""
75
76         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
77         p_ip4 = IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, flags="DF")
78
79         current_mtu = self.get_mtu(self.pg1.sw_if_index)
80
81         p_payload = UDP(sport=1234, dport=1234) / self.payload(current_mtu - 20 - 8)
82
83         p4 = p_ether / p_ip4 / p_payload
84         p4_reply = p_ip4 / p_payload
85         p4_reply.ttl -= 1
86         rx = self.send_and_expect(self.pg0, p4 * 11, self.pg1)
87         for p in rx:
88             self.validate(p[1], p4_reply)
89
90         # MTU
91         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [576, 0, 0, 0])
92         self.assertEqual(576, self.get_mtu(self.pg1.sw_if_index))
93
94         # Should fail. Too large MTU
95         p_icmp4 = ICMP(
96             type="dest-unreach",
97             code="fragmentation-needed",
98             nexthopmtu=576,
99             chksum=0x2DBB,
100         )
101         icmp4_reply = (
102             IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4, ttl=254, len=576, id=0)
103             / p_icmp4
104             / p_ip4
105             / p_payload
106         )
107         n = icmp4_reply.__class__(icmp4_reply)
108         s = bytes(icmp4_reply)
109         icmp4_reply = s[0:576]
110         rx = self.send_and_expect_some(self.pg0, p4 * 11, self.pg0)
111         for p in rx:
112             # p.show2()
113             # n.show2()
114             self.validate_bytes(bytes(p[1]), icmp4_reply)
115
116         # Now with DF off. Expect fragments.
117         # First go with 1500 byte packets.
118         p_payload = UDP(sport=1234, dport=1234) / self.payload(1500 - 20 - 8)
119         p4 = p_ether / p_ip4 / p_payload
120         p4.flags = 0
121         p4_reply = p_ip4 / p_payload
122         p4_reply.ttl = p_ip4.ttl - 1
123         p4_reply.flags = 0
124         p4_reply.id = 256
125         self.pg_enable_capture()
126         self.pg0.add_stream(p4 * 1)
127         self.pg_start()
128         rx = self.pg1.get_capture(3)
129         reass_pkt = reassemble4(rx)
130         self.validate(reass_pkt, p4_reply)
131
132         """
133         # Now what happens with a 9K frame
134         p_payload = UDP(sport=1234, dport=1234) / self.payload(
135             current_mtu - 20 - 8)
136         p4 = p_ether / p_ip4 / p_payload
137         p4.flags = 0
138         p4_reply = p_ip4 / p_payload
139         p4_reply.ttl = 62 # check this
140         p4_reply.flags = 0
141         p4_reply.id = 512
142
143         self.pg_enable_capture()
144         self.pg0.add_stream(p4*1)
145         self.pg_start()
146         rx = self.pg1.get_capture(16)
147         reass_pkt = reassemble4(rx)
148         reass_pkt.show2()
149         p4_reply.show2()
150         self.validate(reass_pkt, p4_reply)
151         """
152
153         # Reset MTU
154         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [current_mtu, 0, 0, 0])
155
156     def test_ip6_mtu(self):
157         """IP6 MTU test"""
158
159         current_mtu = self.get_mtu(self.pg1.sw_if_index)
160
161         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
162         p_ip6 = IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
163
164         p_payload = UDP(sport=1234, dport=1234) / self.payload(current_mtu - 40 - 8)
165
166         p6 = p_ether / p_ip6 / p_payload
167         p6_reply = p_ip6 / p_payload
168         p6_reply.hlim -= 1
169         rx = self.send_and_expect(self.pg0, p6 * 9, self.pg1)
170         for p in rx:
171             self.validate(p[1], p6_reply)
172
173         # MTU (only checked on encap)
174         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1280, 0, 0, 0])
175         self.assertEqual(1280, self.get_mtu(self.pg1.sw_if_index))
176
177         # Should fail. Too large MTU
178         p_icmp6 = ICMPv6PacketTooBig(mtu=1280, cksum=0x4C7A)
179         icmp6_reply = (
180             IPv6(src=self.pg0.local_ip6, dst=self.pg0.remote_ip6, hlim=255, plen=1240)
181             / p_icmp6
182             / p_ip6
183             / p_payload
184         )
185         icmp6_reply[2].hlim -= 1
186         n = icmp6_reply.__class__(icmp6_reply)
187         s = bytes(icmp6_reply)
188         icmp6_reply_str = s[0:1280]
189
190         rx = self.send_and_expect_some(self.pg0, p6 * 9, self.pg0)
191         for p in rx:
192             self.validate_bytes(bytes(p[1]), icmp6_reply_str)
193
194         # Reset MTU
195         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [current_mtu, 0, 0, 0])
196
197
198 if __name__ == "__main__":
199     unittest.main(testRunner=VppTestRunner)