+ #
+ # MTU exceeded
+ #
+ p_mtu = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_ip4,
+ ttl=10, flags='DF') /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 2000))
+
+ self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1500, 0, 0, 0])
+
+ rx = self.send_and_expect(self.pg0, p_mtu * NUM_PKTS, self.pg0)
+ rx = rx[0]
+ icmp = rx[ICMP]
+
+ self.assertEqual(icmptypes[icmp.type], "dest-unreach")
+ self.assertEqual(icmpcodes[icmp.type][icmp.code],
+ "fragmentation-needed")
+ self.assertEqual(icmp.src, self.pg0.remote_ip4)
+ self.assertEqual(icmp.dst, self.pg1.remote_ip4)
+
+ self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2500, 0, 0, 0])
+ rx = self.send_and_expect(self.pg0, p_mtu * NUM_PKTS, self.pg1)
+
+ # Reset MTU for subsequent tests
+ self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [9000, 0, 0, 0])
+
+ #
+ # source address 0.0.0.0 and 25.255.255.255 and for-us
+ #
+ p_s0 = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src="0.0.0.0",
+ dst=self.pg0.local_ip4) /
+ ICMP(id=4, seq=4) /
+ Raw(load='\x0a' * 18))
+ rx = self.send_and_assert_no_replies(self.pg0, p_s0 * 17)
+
+ p_s0 = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src="255.255.255.255",
+ dst=self.pg0.local_ip4) /
+ ICMP(id=4, seq=4) /
+ Raw(load='\x0a' * 18))
+ rx = self.send_and_assert_no_replies(self.pg0, p_s0 * 17)
+
+
+class TestIPDirectedBroadcast(VppTestCase):
+ """ IPv4 Directed Broadcast """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPDirectedBroadcast, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPDirectedBroadcast, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestIPDirectedBroadcast, self).setUp()
+
+ self.create_pg_interfaces(range(2))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+
+ def tearDown(self):
+ super(TestIPDirectedBroadcast, self).tearDown()
+ for i in self.pg_interfaces:
+ i.admin_down()
+
+ def test_ip_input(self):
+ """ IP Directed Broadcast """
+
+ #
+ # set the directed broadcast on pg0 first, then config IP4 addresses
+ # for pg1 directed broadcast is always disabled
+ self.vapi.sw_interface_set_ip_directed_broadcast(
+ self.pg0.sw_if_index, 1)
+
+ p0 = (Ether(src=self.pg1.remote_mac,
+ dst=self.pg1.local_mac) /
+ IP(src="1.1.1.1",
+ dst=self.pg0._local_ip4_bcast) /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 2000))
+ p1 = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src="1.1.1.1",
+ dst=self.pg1._local_ip4_bcast) /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 2000))
+
+ self.pg0.config_ip4()
+ self.pg0.resolve_arp()
+ self.pg1.config_ip4()
+ self.pg1.resolve_arp()
+
+ #
+ # test packet is L2 broadcast
+ #
+ rx = self.send_and_expect(self.pg1, p0 * NUM_PKTS, self.pg0)
+ self.assertTrue(rx[0][Ether].dst, "ff:ff:ff:ff:ff:ff")
+
+ self.send_and_assert_no_replies(self.pg0, p1 * NUM_PKTS,
+ "directed broadcast disabled")
+
+ #
+ # toggle directed broadcast on pg0
+ #
+ self.vapi.sw_interface_set_ip_directed_broadcast(
+ self.pg0.sw_if_index, 0)
+ self.send_and_assert_no_replies(self.pg1, p0 * NUM_PKTS,
+ "directed broadcast disabled")
+
+ self.vapi.sw_interface_set_ip_directed_broadcast(
+ self.pg0.sw_if_index, 1)
+ rx = self.send_and_expect(self.pg1, p0 * NUM_PKTS, self.pg0)
+
+ self.pg0.unconfig_ip4()
+ self.pg1.unconfig_ip4()
+
+
+class TestIPLPM(VppTestCase):
+ """ IPv4 longest Prefix Match """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPLPM, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPLPM, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestIPLPM, self).setUp()
+
+ self.create_pg_interfaces(range(4))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ def tearDown(self):
+ super(TestIPLPM, self).tearDown()
+ for i in self.pg_interfaces:
+ i.admin_down()
+ i.unconfig_ip4()
+
+ def test_ip_lpm(self):
+ """ IP longest Prefix Match """
+
+ s_24 = VppIpRoute(self, "10.1.2.0", 24,
+ [VppRoutePath(self.pg1.remote_ip4,
+ self.pg1.sw_if_index)])
+ s_24.add_vpp_config()
+ s_8 = VppIpRoute(self, "10.0.0.0", 8,
+ [VppRoutePath(self.pg2.remote_ip4,
+ self.pg2.sw_if_index)])
+ s_8.add_vpp_config()
+
+ p_8 = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src="1.1.1.1",
+ dst="10.1.1.1") /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 2000))
+ p_24 = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src="1.1.1.1",
+ dst="10.1.2.1") /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 2000))
+
+ self.logger.info(self.vapi.cli("sh ip fib mtrie"))
+ rx = self.send_and_expect(self.pg0, p_8 * NUM_PKTS, self.pg2)
+ rx = self.send_and_expect(self.pg0, p_24 * NUM_PKTS, self.pg1)
+
+
+class TestIPv4Frag(VppTestCase):
+ """ IPv4 fragmentation """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPv4Frag, cls).setUpClass()
+
+ cls.create_pg_interfaces([0, 1])
+ cls.src_if = cls.pg0
+ cls.dst_if = cls.pg1
+
+ # setup all interfaces
+ for i in cls.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv4Frag, cls).tearDownClass()
+
+ def test_frag_large_packets(self):
+ """ Fragmentation of large packets """
+
+ p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
+ IP(src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4) /
+ UDP(sport=1234, dport=5678) / Raw())
+ self.extend_packet(p, 6000, "abcde")
+ saved_payload = p[Raw].load
+
+ # Force fragmentation by setting MTU of output interface
+ # lower than packet size
+ self.vapi.sw_interface_set_mtu(self.dst_if.sw_if_index,
+ [5000, 0, 0, 0])
+
+ self.pg_enable_capture()
+ self.src_if.add_stream(p)
+ self.pg_start()
+
+ # Expecting 3 fragments because size of created fragments currently
+ # cannot be larger then VPP buffer size (which is 2048)
+ packets = self.dst_if.get_capture(3)
+
+ # Assume VPP sends the fragments in order
+ payload = ''
+ for p in packets:
+ payload_offset = p.frag * 8
+ if payload_offset > 0:
+ payload_offset -= 8 # UDP header is not in payload
+ self.assert_equal(payload_offset, len(payload))
+ payload += p[Raw].load
+ self.assert_equal(payload, saved_payload, "payload")
+