X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_mtu.py;h=3203e40e1148ec97bf3c186ef881ec9846bf8456;hb=fc7344f;hp=abf0c94b268717028e9dbb1fce8989b94252b937;hpb=8a9c8f1412cb1258340b18a8eb622a835ef3c37b;p=vpp.git diff --git a/test/test_mtu.py b/test/test_mtu.py index abf0c94b268..3203e40e114 100644 --- a/test/test_mtu.py +++ b/test/test_mtu.py @@ -12,32 +12,20 @@ import unittest from scapy.layers.inet6 import IPv6, Ether, IP, UDP, ICMPv6PacketTooBig from scapy.layers.inet import ICMP from framework import VppTestCase, VppTestRunner -from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto +from vpp_ip import DpoProto +from vpp_ip_route import VppIpRoute, VppRoutePath from socket import AF_INET, AF_INET6, inet_pton -import StringIO +from util import reassemble4 + """ Test_mtu is a subclass of VPPTestCase classes. MTU tests. """ -def reassemble(listoffragments): - buffer = StringIO.StringIO() - first = listoffragments[0] - buffer.seek(20) - for pkt in listoffragments: - # pkt.show2() - buffer.seek(pkt[IP].frag*8) - buffer.write(pkt[IP].payload) - first.len = len(buffer.getvalue()) + 20 - first.flags = 0 - del(first.chksum) - header = str(first[IP])[:20] - return first[IP].__class__(header + buffer.getvalue()) - - class TestMTU(VppTestCase): """ MTU Test Case """ + maxDiff = None @classmethod def setUpClass(cls): @@ -45,9 +33,9 @@ class TestMTU(VppTestCase): cls.create_pg_interfaces(range(2)) cls.interfaces = list(cls.pg_interfaces) - def setUp(cls): - super(TestMTU, cls).setUp() - for i in cls.interfaces: + def setUp(self): + super(TestMTU, self).setUp() + for i in self.interfaces: i.admin_up() i.config_ip4() i.config_ip6() @@ -64,7 +52,7 @@ class TestMTU(VppTestCase): i.admin_down() def validate(self, rx, expected): - self.assertEqual(rx, expected.__class__(str(expected))) + self.assertEqual(rx, expected.__class__(expected)) def validate_bytes(self, rx, expected): self.assertEqual(rx, expected) @@ -76,23 +64,17 @@ class TestMTU(VppTestCase): rv = self.vapi.sw_interface_dump() for i in rv: if i.sw_if_index == sw_if_index: - return i.link_mtu + return i.mtu[0] return 0 def test_ip4_mtu(self): """ IP4 MTU test """ - # - # TODO: Link MTU is 216 bytes 'off'. Fix when L3 MTU patches committed - # - mtu_offset = 216 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) p_ip4 = IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, flags='DF') - # TODO: Re-enable when MTU fixes are committed current_mtu = self.get_mtu(self.pg1.sw_if_index) - current_mtu -= mtu_offset p_payload = UDP(sport=1234, dport=1234) / self.payload( current_mtu - 20 - 8) @@ -105,8 +87,8 @@ class TestMTU(VppTestCase): self.validate(p[1], p4_reply) # MTU - self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, 576 + mtu_offset) - self.assertEqual(576, self.get_mtu(self.pg1.sw_if_index) - mtu_offset) + self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [576, 0, 0, 0]) + self.assertEqual(576, self.get_mtu(self.pg1.sw_if_index)) # Should fail. Too large MTU p_icmp4 = ICMP(type='dest-unreach', code='fragmentation-needed', @@ -116,16 +98,15 @@ class TestMTU(VppTestCase): ttl=254, len=576, id=0) / p_icmp4 / p_ip4 / p_payload) icmp4_reply[1].ttl -= 1 - n = icmp4_reply.__class__(str(icmp4_reply)) - s = str(icmp4_reply) + n = icmp4_reply.__class__(icmp4_reply) + s = bytes(icmp4_reply) icmp4_reply = s[0:576] rx = self.send_and_expect(self.pg0, p4*11, self.pg0) for p in rx: # p.show2() # n.show2() - self.validate_bytes(str(p[1]), icmp4_reply) + self.validate_bytes(bytes(p[1]), icmp4_reply) - ''' # Now with DF off. Expect fragments. # First go with 1500 byte packets. p_payload = UDP(sport=1234, dport=1234) / self.payload( @@ -133,19 +114,18 @@ class TestMTU(VppTestCase): p4 = p_ether / p_ip4 / p_payload p4.flags = 0 p4_reply = p_ip4 / p_payload - p4_reply.ttl = 62 # check this + p4_reply.ttl = 62 # check this p4_reply.flags = 0 p4_reply.id = 256 self.pg_enable_capture() self.pg0.add_stream(p4*1) self.pg_start() rx = self.pg1.get_capture(3) - print('RX', len(rx)) - reass_pkt = reassemble(rx) + reass_pkt = reassemble4(rx) self.validate(reass_pkt, p4_reply) + ''' # Now what happens with a 9K frame - ''' p_payload = UDP(sport=1234, dport=1234) / self.payload( current_mtu - 20 - 8) p4 = p_ether / p_ip4 / p_payload @@ -159,22 +139,24 @@ class TestMTU(VppTestCase): self.pg0.add_stream(p4*1) self.pg_start() rx = self.pg1.get_capture(16) - reass_pkt = reassemble(rx) + reass_pkt = reassemble4(rx) reass_pkt.show2() p4_reply.show2() self.validate(reass_pkt, p4_reply) ''' + # Reset MTU - self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, current_mtu) + self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, + [current_mtu, 0, 0, 0]) - @unittest.skip("Enable when IPv6 fragmentation is added") def test_ip6_mtu(self): """ IP6 MTU test """ + current_mtu = self.get_mtu(self.pg1.sw_if_index) + p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) p_ip6 = IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) - current_mtu = self.get_mtu(self.pg1.sw_if_index) p_payload = UDP(sport=1234, dport=1234) / self.payload( current_mtu - 40 - 8) @@ -186,26 +168,27 @@ class TestMTU(VppTestCase): self.validate(p[1], p6_reply) # MTU (only checked on encap) - self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, 1280) + self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1280, 0, 0, 0]) self.assertEqual(1280, self.get_mtu(self.pg1.sw_if_index)) # Should fail. Too large MTU p_icmp6 = ICMPv6PacketTooBig(mtu=1280, cksum=0x4c7a) icmp6_reply = (IPv6(src=self.pg0.local_ip6, dst=self.pg0.remote_ip6, - hlim=254, plen=1240) / + hlim=255, plen=1240) / p_icmp6 / p_ip6 / p_payload) icmp6_reply[2].hlim -= 1 - n = icmp6_reply.__class__(str(icmp6_reply)) - s = str(icmp6_reply) - icmp6_reply = s[0:1280] + n = icmp6_reply.__class__(icmp6_reply) + s = bytes(icmp6_reply) + icmp6_reply_str = s[0:1280] rx = self.send_and_expect(self.pg0, p6*9, self.pg0) for p in rx: - self.validate_bytes(str(p[1]), icmp6_reply) + self.validate_bytes(bytes(p[1]), icmp6_reply_str) # Reset MTU - self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, current_mtu) + self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, + [current_mtu, 0, 0, 0]) if __name__ == '__main__':