build: don't overwrite quicly build/install logs
[vpp.git] / test / test_mtu.py
index cef0ec0..922d83d 100644 (file)
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 """IP4 and IP6 MTU functional tests"""
 
 #
 """IP4 and IP6 MTU functional tests"""
 
 #
@@ -12,32 +12,21 @@ import unittest
 from scapy.layers.inet6 import IPv6, Ether, IP, UDP, ICMPv6PacketTooBig
 from scapy.layers.inet import ICMP
 from framework import VppTestCase, VppTestRunner
 from scapy.layers.inet6 import IPv6, Ether, IP, UDP, ICMPv6PacketTooBig
 from scapy.layers.inet import ICMP
 from framework import VppTestCase, VppTestRunner
-from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
+from vpp_ip import DpoProto
+from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto
 from socket import AF_INET, AF_INET6, inet_pton
 from socket import AF_INET, AF_INET6, inet_pton
-import StringIO
+from util import reassemble4
+
 
 """ Test_mtu is a subclass of VPPTestCase classes.
     MTU tests.
 """
 
 
 
 """ Test_mtu is a subclass of VPPTestCase classes.
     MTU tests.
 """
 
 
-def reassemble(listoffragments):
-    buffer = StringIO.StringIO()
-    first = listoffragments[0]
-    buffer.seek(20)
-    for pkt in listoffragments:
-        # pkt.show2()
-        buffer.seek(pkt[IP].frag*8)
-        buffer.write(pkt[IP].payload)
-    first.len = len(buffer.getvalue()) + 20
-    first.flags = 0
-    del(first.chksum)
-    header = str(first[IP])[:20]
-    return first[IP].__class__(header + buffer.getvalue())
-
-
 class TestMTU(VppTestCase):
 class TestMTU(VppTestCase):
-    """ MTU Test Case """
+    """MTU Test Case"""
+
+    maxDiff = None
 
     @classmethod
     def setUpClass(cls):
 
     @classmethod
     def setUpClass(cls):
@@ -45,9 +34,13 @@ class TestMTU(VppTestCase):
         cls.create_pg_interfaces(range(2))
         cls.interfaces = list(cls.pg_interfaces)
 
         cls.create_pg_interfaces(range(2))
         cls.interfaces = list(cls.pg_interfaces)
 
-    def setUp(cls):
-        super(TestMTU, cls).setUp()
-        for i in cls.interfaces:
+    @classmethod
+    def tearDownClass(cls):
+        super(TestMTU, cls).tearDownClass()
+
+    def setUp(self):
+        super(TestMTU, self).setUp()
+        for i in self.interfaces:
             i.admin_up()
             i.config_ip4()
             i.config_ip6()
             i.admin_up()
             i.config_ip4()
             i.config_ip6()
@@ -64,88 +57,82 @@ class TestMTU(VppTestCase):
                 i.admin_down()
 
     def validate(self, rx, expected):
                 i.admin_down()
 
     def validate(self, rx, expected):
-        self.assertEqual(rx, expected.__class__(str(expected)))
+        self.assertEqual(rx, expected.__class__(expected))
 
     def validate_bytes(self, rx, expected):
         self.assertEqual(rx, expected)
 
     def payload(self, len):
 
     def validate_bytes(self, rx, expected):
         self.assertEqual(rx, expected)
 
     def payload(self, len):
-        return 'x' * len
+        return "x" * len
 
     def get_mtu(self, sw_if_index):
 
     def get_mtu(self, sw_if_index):
-        rv = self.vapi.sw_interface_dump()
+        rv = self.vapi.sw_interface_dump(sw_if_index=sw_if_index)
         for i in rv:
             if i.sw_if_index == sw_if_index:
         for i in rv:
             if i.sw_if_index == sw_if_index:
-                return i.link_mtu
+                return i.mtu[0]
         return 0
 
     def test_ip4_mtu(self):
         return 0
 
     def test_ip4_mtu(self):
-        """ IP4 MTU test """
+        """IP4 MTU test"""
 
 
-        #
-        # TODO: Link MTU is 216 bytes 'off'. Fix when L3 MTU patches committed
-        #
-        mtu_offset = 216
         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
-        p_ip4 = IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4,
-                   flags='DF')
+        p_ip4 = IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, flags="DF")
 
 
-        # TODO: Re-enable when MTU fixes are committed
         current_mtu = self.get_mtu(self.pg1.sw_if_index)
         current_mtu = self.get_mtu(self.pg1.sw_if_index)
-        current_mtu -= mtu_offset
 
 
-        p_payload = UDP(sport=1234, dport=1234) / self.payload(
-            current_mtu - 20 - 8)
+        p_payload = UDP(sport=1234, dport=1234) / self.payload(current_mtu - 20 - 8)
 
         p4 = p_ether / p_ip4 / p_payload
         p4_reply = p_ip4 / p_payload
         p4_reply.ttl -= 1
 
         p4 = p_ether / p_ip4 / p_payload
         p4_reply = p_ip4 / p_payload
         p4_reply.ttl -= 1
-        rx = self.send_and_expect(self.pg0, p4*11, self.pg1)
+        rx = self.send_and_expect(self.pg0, p4 * 11, self.pg1)
         for p in rx:
             self.validate(p[1], p4_reply)
 
         # MTU
         for p in rx:
             self.validate(p[1], p4_reply)
 
         # MTU
-        self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, 576 + mtu_offset)
-        self.assertEqual(576, self.get_mtu(self.pg1.sw_if_index) - mtu_offset)
+        self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [576, 0, 0, 0])
+        self.assertEqual(576, self.get_mtu(self.pg1.sw_if_index))
 
         # Should fail. Too large MTU
 
         # Should fail. Too large MTU
-        p_icmp4 = ICMP(type='dest-unreach', code='fragmentation-needed',
-                       nexthopmtu=576, chksum=0x2dbb)
-        icmp4_reply = (IP(src=self.pg0.local_ip4,
-                          dst=self.pg0.remote_ip4,
-                          ttl=254, len=576, id=0) /
-                       p_icmp4 / p_ip4 / p_payload)
-        icmp4_reply[1].ttl -= 1
-        n = icmp4_reply.__class__(str(icmp4_reply))
-        s = str(icmp4_reply)
+        p_icmp4 = ICMP(
+            type="dest-unreach",
+            code="fragmentation-needed",
+            nexthopmtu=576,
+            chksum=0x2DBB,
+        )
+        icmp4_reply = (
+            IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4, ttl=254, len=576, id=0)
+            / p_icmp4
+            / p_ip4
+            / p_payload
+        )
+        n = icmp4_reply.__class__(icmp4_reply)
+        s = bytes(icmp4_reply)
         icmp4_reply = s[0:576]
         icmp4_reply = s[0:576]
-        rx = self.send_and_expect(self.pg0, p4*11, self.pg0)
+        rx = self.send_and_expect_some(self.pg0, p4 * 11, self.pg0)
         for p in rx:
             # p.show2()
             # n.show2()
         for p in rx:
             # p.show2()
             # n.show2()
-            self.validate_bytes(str(p[1]), icmp4_reply)
+            self.validate_bytes(bytes(p[1]), icmp4_reply)
 
 
-        '''
         # Now with DF off. Expect fragments.
         # First go with 1500 byte packets.
         # Now with DF off. Expect fragments.
         # First go with 1500 byte packets.
-        p_payload = UDP(sport=1234, dport=1234) / self.payload(
-            1500 - 20 - 8)
+        p_payload = UDP(sport=1234, dport=1234) / self.payload(1500 - 20 - 8)
         p4 = p_ether / p_ip4 / p_payload
         p4.flags = 0
         p4_reply = p_ip4 / p_payload
         p4 = p_ether / p_ip4 / p_payload
         p4.flags = 0
         p4_reply = p_ip4 / p_payload
-        p4_reply.ttl = 62 # check this
+        p4_reply.ttl = p_ip4.ttl - 1
         p4_reply.flags = 0
         p4_reply.id = 256
         self.pg_enable_capture()
         p4_reply.flags = 0
         p4_reply.id = 256
         self.pg_enable_capture()
-        self.pg0.add_stream(p4*1)
+        self.pg0.add_stream(p4 * 1)
         self.pg_start()
         rx = self.pg1.get_capture(3)
         self.pg_start()
         rx = self.pg1.get_capture(3)
-        print('RX', len(rx))
-        reass_pkt = reassemble(rx)
+        reass_pkt = reassemble4(rx)
         self.validate(reass_pkt, p4_reply)
         self.validate(reass_pkt, p4_reply)
-        '''
+
+        """
         # Now what happens with a 9K frame
         # Now what happens with a 9K frame
-        '''
         p_payload = UDP(sport=1234, dport=1234) / self.payload(
             current_mtu - 20 - 8)
         p4 = p_ether / p_ip4 / p_payload
         p_payload = UDP(sport=1234, dport=1234) / self.payload(
             current_mtu - 20 - 8)
         p4 = p_ether / p_ip4 / p_payload
@@ -159,60 +146,56 @@ class TestMTU(VppTestCase):
         self.pg0.add_stream(p4*1)
         self.pg_start()
         rx = self.pg1.get_capture(16)
         self.pg0.add_stream(p4*1)
         self.pg_start()
         rx = self.pg1.get_capture(16)
-        reass_pkt = reassemble(rx)
+        reass_pkt = reassemble4(rx)
         reass_pkt.show2()
         p4_reply.show2()
         self.validate(reass_pkt, p4_reply)
         reass_pkt.show2()
         p4_reply.show2()
         self.validate(reass_pkt, p4_reply)
-        '''
+        """
+
         # Reset MTU
         # Reset MTU
-        self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
-                                       current_mtu + mtu_offset)
+        self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [current_mtu, 0, 0, 0])
 
     def test_ip6_mtu(self):
 
     def test_ip6_mtu(self):
-        """ IP6 MTU test """
+        """IP6 MTU test"""
 
 
-        #
-        # TODO: Link MTU is 216 bytes 'off'. Fix when L3 MTU patches committed
-        #
-        mtu_offset = 216
         current_mtu = self.get_mtu(self.pg1.sw_if_index)
         current_mtu = self.get_mtu(self.pg1.sw_if_index)
-        current_mtu -= mtu_offset
 
         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
         p_ip6 = IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
 
 
         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
         p_ip6 = IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
 
-        p_payload = UDP(sport=1234, dport=1234) / self.payload(
-            current_mtu - 40 - 8)
+        p_payload = UDP(sport=1234, dport=1234) / self.payload(current_mtu - 40 - 8)
 
         p6 = p_ether / p_ip6 / p_payload
         p6_reply = p_ip6 / p_payload
         p6_reply.hlim -= 1
 
         p6 = p_ether / p_ip6 / p_payload
         p6_reply = p_ip6 / p_payload
         p6_reply.hlim -= 1
-        rx = self.send_and_expect(self.pg0, p6*9, self.pg1)
+        rx = self.send_and_expect(self.pg0, p6 * 9, self.pg1)
         for p in rx:
             self.validate(p[1], p6_reply)
 
         # MTU (only checked on encap)
         for p in rx:
             self.validate(p[1], p6_reply)
 
         # MTU (only checked on encap)
-        self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, 1280 + mtu_offset)
-        self.assertEqual(1280, self.get_mtu(self.pg1.sw_if_index) - mtu_offset)
+        self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1280, 0, 0, 0])
+        self.assertEqual(1280, self.get_mtu(self.pg1.sw_if_index))
 
         # Should fail. Too large MTU
 
         # Should fail. Too large MTU
-        p_icmp6 = ICMPv6PacketTooBig(mtu=1280, cksum=0x4c7a)
-        icmp6_reply = (IPv6(src=self.pg0.local_ip6,
-                            dst=self.pg0.remote_ip6,
-                            hlim=254, plen=1240) /
-                       p_icmp6 / p_ip6 / p_payload)
+        p_icmp6 = ICMPv6PacketTooBig(mtu=1280, cksum=0x4C7A)
+        icmp6_reply = (
+            IPv6(src=self.pg0.local_ip6, dst=self.pg0.remote_ip6, hlim=255, plen=1240)
+            / p_icmp6
+            / p_ip6
+            / p_payload
+        )
         icmp6_reply[2].hlim -= 1
         icmp6_reply[2].hlim -= 1
-        n = icmp6_reply.__class__(str(icmp6_reply))
-        s = str(icmp6_reply)
-        icmp6_reply = s[0:1280]
+        n = icmp6_reply.__class__(icmp6_reply)
+        s = bytes(icmp6_reply)
+        icmp6_reply_str = s[0:1280]
 
 
-        rx = self.send_and_expect(self.pg0, p6*9, self.pg0)
+        rx = self.send_and_expect_some(self.pg0, p6 * 9, self.pg0)
         for p in rx:
         for p in rx:
-            self.validate_bytes(str(p[1]), icmp6_reply)
+            self.validate_bytes(bytes(p[1]), icmp6_reply_str)
 
         # Reset MTU
 
         # Reset MTU
-        self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, current_mtu)
+        self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [current_mtu, 0, 0, 0])
 
 
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)
     unittest.main(testRunner=VppTestRunner)