+ def test_atomic_fragment(self):
+ """IPv6 atomic fragment"""
+ pkt = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44, plen=65535)
+ / IPv6ExtHdrFragment(
+ offset=8191, m=1, res1=0xFF, res2=0xFF, nh=255, id=0xFFFF
+ )
+ / ("X" * 1452)
+ )
+
+ rx = self.send_and_expect(self.pg0, [pkt], self.pg0)
+ self.assertIn(ICMPv6ParamProblem, rx[0])
+
+ def test_truncated_fragment(self):
+ """IPv6 truncated fragment header"""
+ pkt = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44, plen=2)
+ / IPv6ExtHdrFragment(nh=6)
+ )
+
+ self.send_and_assert_no_replies(self.pg0, [pkt])
+
+ pkt = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.remote_ip6)
+ / ICMPv6EchoRequest()
+ )
+ rx = self.send_and_expect(self.pg0, [pkt], self.pg0)
+
+ def test_one_fragment(self):
+ """whole packet in one fragment processed independently"""
+ pkt = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+ / ICMPv6EchoRequest()
+ / Raw("X" * 1600)
+ )
+ frags = fragment_rfc8200(pkt, 1, 400)
+
+ # send a fragment with known id
+ self.send_and_assert_no_replies(self.pg0, [frags[0]])
+
+ # send an atomic fragment with same id - should be reassembled
+ pkt = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+ / IPv6ExtHdrFragment(id=1)
+ / ICMPv6EchoRequest()
+ )
+ rx = self.send_and_expect(self.pg0, [pkt], self.pg0)
+ self.assertNotIn(IPv6ExtHdrFragment, rx)
+
+ # now finish the original reassembly, this should still be possible
+ rx = self.send_and_expect(self.pg0, frags[1:], self.pg0, n_rx=1)
+ self.assertNotIn(IPv6ExtHdrFragment, rx)
+
+ def test_bunch_of_fragments(self):
+ """valid fragments followed by rogue fragments and atomic fragment"""
+ pkt = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+ / ICMPv6EchoRequest()
+ / Raw("X" * 1600)
+ )
+ frags = fragment_rfc8200(pkt, 1, 400)
+ self.send_and_expect(self.pg0, frags, self.pg0, n_rx=1)
+
+ inc_frag = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+ / IPv6ExtHdrFragment(id=1, nh=58, offset=608)
+ / Raw("X" * 308)
+ )
+
+ self.send_and_assert_no_replies(self.pg0, inc_frag * 604)
+
+ pkt = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+ / IPv6ExtHdrFragment(id=1)
+ / ICMPv6EchoRequest()
+ )
+ rx = self.send_and_expect(self.pg0, [pkt], self.pg0)
+ self.assertNotIn(IPv6ExtHdrFragment, rx)
+
+ def test_local_enable_disable(self):
+ """local reassembly enabled/disable"""
+ self.vapi.ip_reassembly_enable_disable(
+ sw_if_index=self.src_if.sw_if_index, enable_ip6=False
+ )
+ self.vapi.ip_local_reass_enable_disable(enable_ip6=True)
+ pkt = (
+ Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac)
+ / IPv6(src=self.src_if.remote_ip6, dst=self.src_if.local_ip6)
+ / ICMPv6EchoRequest(id=1234)
+ / Raw("X" * 1600)
+ )
+ frags = fragment_rfc8200(pkt, 1, 400)
+ r = self.send_and_expect(self.src_if, frags, self.src_if, n_rx=1)[0]
+ self.assertEqual(1234, r[ICMPv6EchoReply].id)
+ self.vapi.ip_local_reass_enable_disable()
+
+ self.send_and_assert_no_replies(self.src_if, frags)
+ self.vapi.ip_local_reass_enable_disable(enable_ip6=True)
+