+ remaining_time = timeout
+ capture = None
+ name = self.name if remark is None else "%s (%s)" % (self.name, remark)
+ based_on = "based on provided argument"
+ if expected_count is None:
+ expected_count = \
+ self.test.get_packet_count_for_if_idx(self.sw_if_index)
+ based_on = "based on stored packet_infos"
+ if expected_count == 0:
+ raise Exception(
+ "Internal error, expected packet count for %s is 0!" %
+ name)
+ self.test.logger.debug("Expecting to capture %s (%s) packets on %s" % (
+ expected_count, based_on, name))
+ while remaining_time > 0:
+ before = time.time()
+ capture = self._get_capture(remaining_time, filter_out_fn)
+ elapsed_time = time.time() - before
+ if capture:
+ if len(capture.res) == expected_count:
+ # bingo, got the packets we expected
+ return capture
+ elif len(capture.res) > expected_count:
+ self.test.logger.error(
+ ppc("Unexpected packets captured:", capture))
+ break
+ else:
+ self.test.logger.debug("Partial capture containing %s "
+ "packets doesn't match expected "
+ "count %s (yet?)" %
+ (len(capture.res), expected_count))
+ elif expected_count == 0:
+ # bingo, got None as we expected - return empty capture
+ return PacketList()
+ remaining_time -= elapsed_time
+ if capture:
+ self.generate_debug_aid("count-mismatch")
+ raise Exception("Captured packets mismatch, captured %s packets, "
+ "expected %s packets on %s" %
+ (len(capture.res), expected_count, name))
+ else:
+ raise Exception("No packets captured on %s" % name)
+
+ def assert_nothing_captured(self, remark=None, filter_out_fn=is_ipv6_misc):
+ """ Assert that nothing unfiltered was captured on interface
+
+ :param remark: remark printed into debug logs
+ :param filter_out_fn: filter applied to each packet, packets for which
+ the filter returns True are removed from capture
+ """
+ if os.path.isfile(self.out_path):
+ try:
+ capture = self.get_capture(
+ 0, remark=remark, filter_out_fn=filter_out_fn)
+ if not capture or len(capture.res) == 0:
+ # junk filtered out, we're good
+ return
+ except:
+ pass
+ self.generate_debug_aid("empty-assert")
+ if remark:
+ raise AssertionError(
+ "Non-empty capture file present for interface %s (%s)" %
+ (self.name, remark))
+ else:
+ raise AssertionError("Capture file present for interface %s" %
+ self.name)
+
+ def wait_for_pg_stop(self):
+ # wait till packet-generator is stopped
+ # "show packet-generator" while it is still running gives this:
+ # Name Enabled Count Parameters
+ # pcap0-sw_if_inde Yes 64 limit 64, ...
+ #
+ # also have a 5-minute timeout just in case things go terribly wrong...
+ deadline = time.time() + 300
+ while self.test.vapi.cli('show packet-generator').find("Yes") != -1:
+ self._test.sleep(0.01) # yield
+ if time.time() > deadline:
+ self.test.logger.debug("Timeout waiting for pg to stop")
+ break
+
+ def wait_for_capture_file(self, timeout=1):
+ """
+ Wait until pcap capture file appears