+ self.vapi.ip_reassembly_set(timeout_ms=1000, max_reassemblies=0,
+ expire_walk_interval_ms=10000,
+ is_ip6=is_ip6)
+
+ self.pg_enable_capture()
+ self.src_if.add_stream(stream)
+ self.pg_start()
+
+ packets = self.dst_if.get_capture(
+ len(self.pkt_infos) - len(dropped_packet_indexes))
+ self.verify_capture(scapy_ip_family, packets, dropped_packet_indexes)
+ self.src_if.assert_nothing_captured()
+
+ def test_duplicates(self, scapy_ip_family, stream):
+ """ duplicate fragments """
+ validate_scapy_ip_family(scapy_ip_family)
+
+ self.pg_enable_capture()
+ self.src_if.add_stream(stream)
+ self.pg_start()
+
+ packets = self.dst_if.get_capture(len(self.pkt_infos))
+ self.verify_capture(scapy_ip_family, packets)
+ self.src_if.assert_nothing_captured()
+
+ def test_random(self, scapy_ip_family, stream):
+ """ random order reassembly """
+ validate_scapy_ip_family(scapy_ip_family)
+
+ fragments = list(stream)
+ shuffle(fragments)
+
+ self.pg_enable_capture()
+ self.src_if.add_stream(fragments)
+ self.pg_start()
+
+ packets = self.dst_if.get_capture(len(self.packet_infos))
+ self.verify_capture(scapy_ip_family, packets)
+ self.src_if.assert_nothing_captured()
+
+ # run it all again to verify correctness
+ self.pg_enable_capture()
+ self.src_if.add_stream(fragments)
+ self.pg_start()
+
+ packets = self.dst_if.get_capture(len(self.packet_infos))
+ self.verify_capture(scapy_ip_family, packets)
+ self.src_if.assert_nothing_captured()
+
+ def test_reassembly(self, scapy_ip_family, stream):
+ """ basic reassembly """
+ validate_scapy_ip_family(scapy_ip_family)
+
+ self.pg_enable_capture()
+ self.src_if.add_stream(stream)
+ self.pg_start()
+
+ packets = self.dst_if.get_capture(len(self.pkt_infos))
+ self.verify_capture(scapy_ip_family, packets)
+ self.src_if.assert_nothing_captured()
+
+ # run it all again to verify correctness
+ self.pg_enable_capture()
+ self.src_if.add_stream(stream)
+ self.pg_start()
+
+ packets = self.dst_if.get_capture(len(self.pkt_infos))
+ self.verify_capture(scapy_ip_family, packets)
+ self.src_if.assert_nothing_captured()
+
+ def test_reversed(self, scapy_ip_family, stream):
+ """ reverse order reassembly """
+ validate_scapy_ip_family(scapy_ip_family)
+
+ fragments = list(stream)
+ fragments.reverse()
+
+ self.pg_enable_capture()
+ self.src_if.add_stream(fragments)
+ self.pg_start()
+
+ packets = self.dst_if.get_capture(len(self.packet_infos))
+ self.verify_capture(scapy_ip_family, packets)
+ self.src_if.assert_nothing_captured()
+
+ # run it all again to verify correctness
+ self.pg_enable_capture()
+ self.src_if.add_stream(fragments)
+ self.pg_start()
+
+ packets = self.dst_if.get_capture(len(self.packet_infos))
+ self.verify_capture(scapy_ip_family, packets)
+ self.src_if.assert_nothing_captured()
+
+ def test_timeout_inline(self, scapy_ip_family, stream,
+ dropped_packet_indexes):
+ """ timeout (inline) """
+ validate_scapy_ip_family(scapy_ip_family)
+ is_ip6 = 1 if scapy_ip_family == IPv6 else 0
+
+ self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
+ expire_walk_interval_ms=10000,
+ is_ip6=is_ip6)
+
+ self.pg_enable_capture()
+ self.src_if.add_stream(stream)
+ self.pg_start()
+
+ packets = self.dst_if.get_capture(
+ len(self.pkt_infos) - len(dropped_packet_indexes))
+ self.verify_capture(scapy_ip_family, packets,
+ dropped_packet_indexes)
+
+
+class TestIPv4Reassembly(TestIPReassemblyMixin, VppTestCase):