-# <class 'scapy.layers.inet.IP'>
-# <class 'scapy.layers.inet6.IPv6'>
-_scapy_ip_family_types = (IP, IPv6)
-
-
-def validate_scapy_ip_family(scapy_ip_family):
-
- if scapy_ip_family not in _scapy_ip_family_types:
- raise ValueError("'scapy_ip_family' must be of type: %s. Got %s" %
- (_scapy_ip_family_types, scapy_ip_family))
-
-
-class TestIPReassemblyMixin(object):
-
- def verify_capture(self, scapy_ip_family, capture,
- dropped_packet_indexes=None):
- """Verify captured packet stream.
-
- :param list capture: Captured packet stream.
- """
- validate_scapy_ip_family(scapy_ip_family)
-
- if dropped_packet_indexes is None:
- dropped_packet_indexes = []
- info = None
- seen = set()
- for packet in capture:
- try:
- self.logger.debug(ppp("Got packet:", packet))
- ip = packet[scapy_ip_family]
- udp = packet[UDP]
- payload_info = self.payload_to_info(packet[Raw])
- packet_index = payload_info.index
- self.assertTrue(
- packet_index not in dropped_packet_indexes,
- ppp("Packet received, but should be dropped:", packet))
- if packet_index in seen:
- raise Exception(ppp("Duplicate packet received", packet))
- seen.add(packet_index)
- self.assertEqual(payload_info.dst, self.src_if.sw_if_index)
- info = self._packet_infos[packet_index]
- self.assertTrue(info is not None)
- self.assertEqual(packet_index, info.index)
- saved_packet = info.data
- self.assertEqual(ip.src, saved_packet[scapy_ip_family].src)
- self.assertEqual(ip.dst, saved_packet[scapy_ip_family].dst)
- self.assertEqual(udp.payload, saved_packet[UDP].payload)
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- for index in self._packet_infos:
- self.assertTrue(index in seen or index in dropped_packet_indexes,
- "Packet with packet_index %d not received" % index)
-
- def test_disabled(self, scapy_ip_family, stream,
- dropped_packet_indexes):
- """ reassembly disabled """
- validate_scapy_ip_family(scapy_ip_family)
- is_ip6 = 1 if scapy_ip_family == IPv6 else 0
-
- self.vapi.ip_reassembly_set(timeout_ms=1000, max_reassemblies=0,
- max_reassembly_length=1000,
- expire_walk_interval_ms=10000,
- is_ip6=is_ip6)
-
- self.pg_enable_capture()
- self.src_if.add_stream(stream)
- self.pg_start()
-
- packets = self.dst_if.get_capture(
- len(self.pkt_infos) - len(dropped_packet_indexes))
- self.verify_capture(scapy_ip_family, packets, dropped_packet_indexes)
- self.src_if.assert_nothing_captured()
-
- def test_duplicates(self, scapy_ip_family, stream):
- """ duplicate fragments """
- validate_scapy_ip_family(scapy_ip_family)
-
- self.pg_enable_capture()
- self.src_if.add_stream(stream)
- self.pg_start()
-
- packets = self.dst_if.get_capture(len(self.pkt_infos))
- self.verify_capture(scapy_ip_family, packets)
- self.src_if.assert_nothing_captured()
-
- def test_random(self, scapy_ip_family, stream):
- """ random order reassembly """
- validate_scapy_ip_family(scapy_ip_family)
-
- fragments = list(stream)
- shuffle(fragments)
-
- self.pg_enable_capture()
- self.src_if.add_stream(fragments)
- self.pg_start()
-
- packets = self.dst_if.get_capture(len(self.packet_infos))
- self.verify_capture(scapy_ip_family, packets)
- self.src_if.assert_nothing_captured()
-
- # run it all again to verify correctness
- self.pg_enable_capture()
- self.src_if.add_stream(fragments)
- self.pg_start()
-
- packets = self.dst_if.get_capture(len(self.packet_infos))
- self.verify_capture(scapy_ip_family, packets)
- self.src_if.assert_nothing_captured()
-
- def test_reassembly(self, scapy_ip_family, stream):
- """ basic reassembly """
- validate_scapy_ip_family(scapy_ip_family)
-
- self.pg_enable_capture()
- self.src_if.add_stream(stream)
- self.pg_start()
-
- packets = self.dst_if.get_capture(len(self.pkt_infos))
- self.verify_capture(scapy_ip_family, packets)
- self.src_if.assert_nothing_captured()