+ remaining_time = timeout
+ capture = None
+ name = self.name if remark is None else "%s (%s)" % (self.name, remark)
+ based_on = "based on provided argument"
+ if expected_count is None:
+ expected_count = self.test.get_packet_count_for_if_idx(self.sw_if_index)
+ based_on = "based on stored packet_infos"
+ if expected_count == 0:
+ raise Exception(
+ "Internal error, expected packet count for %s is 0!" % name
+ )
+ self.test.logger.debug(
+ "Expecting to capture %s (%s) packets on %s"
+ % (expected_count, based_on, name)
+ )
+ while remaining_time > 0:
+ before = time.time()
+ capture = self._get_capture(remaining_time, filter_out_fn)
+ elapsed_time = time.time() - before
+ if capture:
+ if len(capture.res) == expected_count:
+ # bingo, got the packets we expected
+ return capture
+ elif len(capture.res) > expected_count:
+ self.test.logger.error(
+ ppc(
+ f"Unexpected packets captured, got {len(capture.res)}, expected {expected_count}:",
+ capture,
+ )
+ )
+ break
+ else:
+ self.test.logger.debug(
+ "Partial capture containing %s "
+ "packets doesn't match expected "
+ "count %s (yet?)" % (len(capture.res), expected_count)
+ )
+ elif expected_count == 0:
+ # bingo, got None as we expected - return empty capture
+ return PacketList()
+ remaining_time -= elapsed_time
+ if capture:
+ self.generate_debug_aid("count-mismatch")
+ if len(capture) > 0 and 0 == expected_count:
+ rem = f"\n{remark}" if remark else ""
+ raise UnexpectedPacketError(
+ capture[0],
+ f"\n({len(capture)} packets captured in total){rem} on {name}",
+ )
+ msg = f"Captured packets mismatch, captured {len(capture.res)} packets, expected {expected_count} packets on {name}:"
+ raise Exception(f"{ppc(msg, capture)}")
+ else:
+ if 0 == expected_count:
+ return
+ raise Exception(f"No packets captured on {name} (timeout = {timeout}s)")
+
+ def assert_nothing_captured(
+ self, timeout=1, remark=None, filter_out_fn=is_ipv6_misc
+ ):
+ """Assert that nothing unfiltered was captured on interface
+
+ :param remark: remark printed into debug logs
+ :param filter_out_fn: filter applied to each packet, packets for which
+ the filter returns True are removed from capture
+ """
+ capture = self.get_capture(
+ 0, timeout=timeout, remark=remark, filter_out_fn=filter_out_fn
+ )
+ if not capture or len(capture.res) == 0:
+ # junk filtered out, we're good
+ return