- if os.path.isfile(self.out_path):
- name = "%s/history.[timestamp:%f].[%s-counter:%04d].%s" % \
- (self.test.tempdir,
- time.time(),
- self.name,
- self.out_history_counter,
- self._out_file)
- self.test.logger.debug("Renaming %s->%s" %
- (self.out_path, name))
- os.rename(self.out_path, name)
- except:
- pass
+ if os.path.isfile(path):
+ self.test.logger.debug(f"Creating hard link {path}->{name}")
+ os.link(path, name)
+ except OSError:
+ self.test.logger.debug(
+ f"OSError: Could not create hard link {path}->{name}"
+ )
+
+ def remove_old_pcap_file(self, path):
+ self.wait_for_pg_stop()
+ self.test.unlink_testcase_file(self.test, Path(path))
+ return
+
+ def decode_pcap_files(self, pcap_dir, filename_prefix):
+ # Generate tshark packet trace of testcase pcap files
+ pg_decode = f"{pcap_dir}/pcap-decode-{filename_prefix}.txt"
+ if os.path.isfile(pg_decode):
+ self.test.logger.debug(
+ f"The pg streams decode file already exists: {pg_decode}"
+ )
+ return
+ self.test.logger.debug(
+ f"Generating testcase pg streams decode file: {pg_decode}"
+ )
+ ts_opts = "-Vr"
+ for p in sorted(Path(pcap_dir).glob(f"{filename_prefix}*.pcap")):
+ self.test.logger.debug(f"Decoding {p}")
+ with open(f"{pg_decode}", "a", buffering=1) as f:
+ print(f"tshark {ts_opts} {p}", file=f)
+ tshark(ts_opts, f"{p}", _out=f)
+ print("", file=f)
+
+ def enable_capture(self):
+ """Enable capture on this packet-generator interface
+ of at most n packets.
+ If n < 0, this is no limit
+ """
+ # disable the capture to flush the capture
+ self.disable_capture()
+ self.remove_old_pcap_file(self.out_path)