-
-
-def parse(dirpath: str, fake_value: float) -> Dict[str, List[float]]:
- """Looks for test jsons, extract scalar results.
-
- Files other than .json are skipped, jsons without test_id are skipped.
- If the test failed, four fake values are used as a fake result.
-
- Units are ignored, as both parent and current are tested
- with the same CSIT code so the unit should be identical.
-
- :param dirpath: Path to the directory tree to examine.
- :param fail_value: Fake value to use for test cases that failed.
- :type dirpath: str
- :returns: Mapping from test IDs to list of measured values.
- :rtype: Dict[str, List[float]]
- :raises RuntimeError: On duplicate test ID or unknown test type.
- """
- results = {}
- for root, _, files in os.walk(dirpath):
- for filename in files:
- if not filename.endswith(".json"):
- continue
- filepath = os.path.join(root, filename)
- with open(filepath, "rt", encoding="utf8") as file_in:
- data = json.load(file_in)
- if "test_id" not in data:
- continue
- name = data["test_id"]
- if name in results:
- raise RuntimeError(f"Duplicate: {name}")
- if not data["passed"]:
- results[name] = [fake_value] * 4
- continue
- result_object = data["result"]
- result_type = result_object["type"]
- if result_type == "mrr":
- results[name] = result_object["receive_rate"]["rate"]["values"]
- elif result_type == "ndrpdr":
- results[name] = [result_object["pdr"]["lower"]["rate"]["value"]]
- elif result_type == "soak":
- results[name] = [
- result_object["critical_rate"]["lower"]["rate"]["value"]
- ]
- elif result_type == "reconf":
- results[name] = [result_object["loss"]["time"]["value"]]
- elif result_type == "hoststack":
- results[name] = [result_object["bandwidth"]["value"]]
- else:
- raise RuntimeError(f"Unknown result type: {result_type}")
- return results