2 """IP4 and IP6 MTU functional tests"""
7 # - Verify that adjacencies inherit MTU correctly
8 # - Verify that sub-interfaces inherit MTU correctly
9 # - Different types of interfaces?
12 from scapy.layers.inet6 import IPv6, Ether, IP, UDP, ICMPv6PacketTooBig
13 from scapy.layers.inet import ICMP
14 from framework import VppTestCase
15 from asfframework import VppTestRunner
16 from util import reassemble4
19 """ Test_mtu is a subclass of VPPTestCase classes.
24 class TestMTU(VppTestCase):
31 super(TestMTU, cls).setUpClass()
32 cls.create_pg_interfaces(range(2))
33 cls.interfaces = list(cls.pg_interfaces)
36 def tearDownClass(cls):
37 super(TestMTU, cls).tearDownClass()
40 super(TestMTU, self).setUp()
41 for i in self.interfaces:
50 super(TestMTU, self).tearDown()
52 for i in self.pg_interfaces:
57 def validate(self, rx, expected):
58 self.assertEqual(rx, expected.__class__(expected))
60 def validate_bytes(self, rx, expected):
61 self.assertEqual(rx, expected)
63 def payload(self, len):
66 def get_mtu(self, sw_if_index):
67 rv = self.vapi.sw_interface_dump(sw_if_index=sw_if_index)
69 if i.sw_if_index == sw_if_index:
73 def test_ip4_mtu(self):
76 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
77 p_ip4 = IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, flags="DF")
79 current_mtu = self.get_mtu(self.pg1.sw_if_index)
81 p_payload = UDP(sport=1234, dport=1234) / self.payload(current_mtu - 20 - 8)
83 p4 = p_ether / p_ip4 / p_payload
84 p4_reply = p_ip4 / p_payload
86 rx = self.send_and_expect(self.pg0, p4 * 11, self.pg1)
88 self.validate(p[1], p4_reply)
91 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [576, 0, 0, 0])
92 self.assertEqual(576, self.get_mtu(self.pg1.sw_if_index))
94 # Should fail. Too large MTU
97 code="fragmentation-needed",
102 IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4, ttl=254, len=576, id=0)
107 n = icmp4_reply.__class__(icmp4_reply)
108 s = bytes(icmp4_reply)
109 icmp4_reply = s[0:576]
110 rx = self.send_and_expect_some(self.pg0, p4 * 11, self.pg0)
114 self.validate_bytes(bytes(p[1]), icmp4_reply)
116 # Now with DF off. Expect fragments.
117 # First go with 1500 byte packets.
118 p_payload = UDP(sport=1234, dport=1234) / self.payload(1500 - 20 - 8)
119 p4 = p_ether / p_ip4 / p_payload
121 p4_reply = p_ip4 / p_payload
122 p4_reply.ttl = p_ip4.ttl - 1
125 self.pg_enable_capture()
126 self.pg0.add_stream(p4 * 1)
128 rx = self.pg1.get_capture(3)
129 reass_pkt = reassemble4(rx)
130 self.validate(reass_pkt, p4_reply)
133 # Now what happens with a 9K frame
134 p_payload = UDP(sport=1234, dport=1234) / self.payload(
135 current_mtu - 20 - 8)
136 p4 = p_ether / p_ip4 / p_payload
138 p4_reply = p_ip4 / p_payload
139 p4_reply.ttl = 62 # check this
143 self.pg_enable_capture()
144 self.pg0.add_stream(p4*1)
146 rx = self.pg1.get_capture(16)
147 reass_pkt = reassemble4(rx)
150 self.validate(reass_pkt, p4_reply)
154 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [current_mtu, 0, 0, 0])
156 def test_ip6_mtu(self):
159 current_mtu = self.get_mtu(self.pg1.sw_if_index)
161 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
162 p_ip6 = IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
164 p_payload = UDP(sport=1234, dport=1234) / self.payload(current_mtu - 40 - 8)
166 p6 = p_ether / p_ip6 / p_payload
167 p6_reply = p_ip6 / p_payload
169 rx = self.send_and_expect(self.pg0, p6 * 9, self.pg1)
171 self.validate(p[1], p6_reply)
173 # MTU (only checked on encap)
174 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1280, 0, 0, 0])
175 self.assertEqual(1280, self.get_mtu(self.pg1.sw_if_index))
177 # Should fail. Too large MTU
178 p_icmp6 = ICMPv6PacketTooBig(mtu=1280, cksum=0x4C7A)
180 IPv6(src=self.pg0.local_ip6, dst=self.pg0.remote_ip6, hlim=255, plen=1240)
185 icmp6_reply[2].hlim -= 1
186 n = icmp6_reply.__class__(icmp6_reply)
187 s = bytes(icmp6_reply)
188 icmp6_reply_str = s[0:1280]
190 rx = self.send_and_expect_some(self.pg0, p6 * 9, self.pg0)
192 self.validate_bytes(bytes(p[1]), icmp6_reply_str)
195 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [current_mtu, 0, 0, 0])
198 if __name__ == "__main__":
199 unittest.main(testRunner=VppTestRunner)