2 """IP4 and IP6 MTU functional tests"""
7 # - Verify that adjacencies inherit MTU correctly
8 # - Verify that sub-interfaces inherit MTU correctly
9 # - Different types of interfaces?
12 from scapy.layers.inet6 import IPv6, Ether, IP, UDP, ICMPv6PacketTooBig
13 from scapy.layers.inet import ICMP
14 from framework import VppTestCase
15 from asfframework import VppTestRunner
16 from util import reassemble4
19 """ Test_mtu is a subclass of VPPTestCase classes.
24 class TestMTU(VppTestCase):
31 super(TestMTU, cls).setUpClass()
32 cls.create_pg_interfaces(range(2))
33 cls.interfaces = list(cls.pg_interfaces)
36 def tearDownClass(cls):
37 super(TestMTU, cls).tearDownClass()
40 super(TestMTU, self).setUp()
41 for i in self.interfaces:
50 super(TestMTU, self).tearDown()
52 for i in self.pg_interfaces:
57 def validate(self, rx, expected):
58 self.assertEqual(rx, expected.__class__(expected))
60 def validate_bytes(self, rx, expected):
61 self.assertEqual(rx, expected)
63 def payload(self, len):
66 def get_mtu(self, sw_if_index):
67 rv = self.vapi.sw_interface_dump(sw_if_index=sw_if_index)
69 if i.sw_if_index == sw_if_index:
73 def test_ip4_mtu(self):
76 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
77 p_ip4 = IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, flags="DF")
79 current_mtu = self.get_mtu(self.pg1.sw_if_index)
81 p_payload = UDP(sport=1234, dport=1234) / self.payload(current_mtu - 20 - 8)
83 p4 = p_ether / p_ip4 / p_payload
84 p4_reply = p_ip4 / p_payload
86 rx = self.send_and_expect(self.pg0, p4 * 11, self.pg1)
88 self.validate(p[1], p4_reply)
91 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [576, 0, 0, 0])
92 self.assertEqual(576, self.get_mtu(self.pg1.sw_if_index))
94 # Should fail. Too large MTU
97 code="fragmentation-needed",
102 IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4, ttl=255, len=576, id=0)
107 n = icmp4_reply.__class__(icmp4_reply)
108 s = bytes(icmp4_reply)
109 icmp4_reply = s[0:576]
110 rx = self.send_and_expect_some(self.pg0, p4 * 11, self.pg0)
114 self.validate_bytes(bytes(p[1]), icmp4_reply)
116 self.assert_error_counter_equal("/err/ip4-input/mtu_exceeded", 11)
118 # Now with DF off. Expect fragments.
119 # First go with 1500 byte packets.
120 p_payload = UDP(sport=1234, dport=1234) / self.payload(1500 - 20 - 8)
121 p4 = p_ether / p_ip4 / p_payload
123 p4_reply = p_ip4 / p_payload
124 p4_reply.ttl = p_ip4.ttl - 1
127 self.pg_enable_capture()
128 self.pg0.add_stream(p4 * 1)
130 rx = self.pg1.get_capture(3)
131 reass_pkt = reassemble4(rx)
132 self.validate(reass_pkt, p4_reply)
135 # Now what happens with a 9K frame
136 p_payload = UDP(sport=1234, dport=1234) / self.payload(
137 current_mtu - 20 - 8)
138 p4 = p_ether / p_ip4 / p_payload
140 p4_reply = p_ip4 / p_payload
141 p4_reply.ttl = 62 # check this
145 self.pg_enable_capture()
146 self.pg0.add_stream(p4*1)
148 rx = self.pg1.get_capture(16)
149 reass_pkt = reassemble4(rx)
152 self.validate(reass_pkt, p4_reply)
156 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [current_mtu, 0, 0, 0])
158 def test_ip6_mtu(self):
161 current_mtu = self.get_mtu(self.pg1.sw_if_index)
163 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
164 p_ip6 = IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
166 p_payload = UDP(sport=1234, dport=1234) / self.payload(current_mtu - 40 - 8)
168 p6 = p_ether / p_ip6 / p_payload
169 p6_reply = p_ip6 / p_payload
171 rx = self.send_and_expect(self.pg0, p6 * 9, self.pg1)
173 self.validate(p[1], p6_reply)
175 # MTU (only checked on encap)
176 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1280, 0, 0, 0])
177 self.assertEqual(1280, self.get_mtu(self.pg1.sw_if_index))
179 # Should fail. Too large MTU
180 p_icmp6 = ICMPv6PacketTooBig(mtu=1280, cksum=0x4C7A)
182 IPv6(src=self.pg0.local_ip6, dst=self.pg0.remote_ip6, hlim=255, plen=1240)
187 icmp6_reply[2].hlim -= 1
188 n = icmp6_reply.__class__(icmp6_reply)
189 s = bytes(icmp6_reply)
190 icmp6_reply_str = s[0:1280]
192 rx = self.send_and_expect_some(self.pg0, p6 * 9, self.pg0)
194 self.validate_bytes(bytes(p[1]), icmp6_reply_str)
196 self.assert_error_counter_equal("/err/ip6-input/mtu_exceeded", 9)
199 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [current_mtu, 0, 0, 0])
202 if __name__ == "__main__":
203 unittest.main(testRunner=VppTestRunner)