class ExportJson():
"""Class handling the json data setting and export."""
- ROBOT_LIBRARY_SCOPE = u"GLOBAL"
+ ROBOT_LIBRARY_SCOPE = "GLOBAL"
def __init__(self):
"""Declare required fields, cache output dir.
Also memorize schema validator instances.
"""
- self.output_dir = BuiltIn().get_variable_value(u"\\${OUTPUT_DIR}", ".")
+ self.output_dir = BuiltIn().get_variable_value("\\${OUTPUT_DIR}", ".")
self.file_path = None
self.data = None
self.validators = get_validators()
:rtype: str
:raises RuntimeError: If the test tags does not contain expected values.
"""
- tags = self.data[u"tags"]
+ tags = self.data["tags"]
# First 5 options are specific for VPP tests.
- if u"DEVICETEST" in tags:
- test_type = u"device"
- elif u"LDP_NGINX" in tags:
- test_type = u"hoststack"
- elif u"HOSTSTACK" in tags:
- test_type = u"hoststack"
- elif u"GSO_TRUE" in tags or u"GSO_FALSE" in tags:
- test_type = u"mrr"
- elif u"RECONF" in tags:
- test_type = u"reconf"
+ if "DEVICETEST" in tags:
+ test_type = "device"
+ elif "LDP_NGINX" in tags:
+ test_type = "hoststack"
+ elif "HOSTSTACK" in tags:
+ test_type = "hoststack"
+ elif "GSO_TRUE" in tags or "GSO_FALSE" in tags:
+ test_type = "mrr"
+ elif "RECONF" in tags:
+ test_type = "reconf"
# The remaining 3 options could also apply to DPDK and TRex tests.
- elif u"SOAK" in tags:
- test_type = u"soak"
- elif u"NDRPDR" in tags:
- test_type = u"ndrpdr"
- elif u"MRR" in tags:
- test_type = u"mrr"
+ elif "SOAK" in tags:
+ test_type = "soak"
+ elif "NDRPDR" in tags:
+ test_type = "ndrpdr"
+ elif "MRR" in tags:
+ test_type = "mrr"
else:
raise RuntimeError(f"Unable to infer test type from tags: {tags}")
return test_type
new_file_path = write_output(self.file_path, self.data)
# Data is going to be cleared (as a sign that export succeeded),
# so this is the last chance to detect if it was for a test case.
- is_testcase = u"result" in self.data
+ is_testcase = "result" in self.data
self.data = None
# Validation for output goes here when ready.
self.file_path = None
if is_testcase:
- validate(new_file_path, self.validators[u"tc_info"])
+ validate(new_file_path, self.validators["tc_info"])
def warn_on_bad_export(self):
"""If bad state is detected, log a warning and clean up state."""
"""
self.warn_on_bad_export()
start_time = datetime.datetime.utcnow().strftime(
- u"%Y-%m-%dT%H:%M:%S.%fZ"
+ "%Y-%m-%dT%H:%M:%S.%fZ"
)
- suite_name = BuiltIn().get_variable_value(u"\\${SUITE_NAME}")
- suite_id = suite_name.lower().replace(u" ", u"_")
- suite_path_part = os.path.join(*suite_id.split(u"."))
+ suite_name = BuiltIn().get_variable_value("\\${SUITE_NAME}")
+ suite_id = suite_name.lower().replace(" ", "_")
+ suite_path_part = os.path.join(*suite_id.split("."))
output_dir = self.output_dir
self.file_path = os.path.join(
- output_dir, suite_path_part, u"setup.info.json"
+ output_dir, suite_path_part, "setup.info.json"
)
self.data = dict()
- self.data[u"version"] = Constants.MODEL_VERSION
- self.data[u"start_time"] = start_time
- self.data[u"suite_name"] = suite_name
- self.data[u"suite_documentation"] = BuiltIn().get_variable_value(
- u"\\${SUITE_DOCUMENTATION}"
+ self.data["version"] = Constants.MODEL_VERSION
+ self.data["start_time"] = start_time
+ self.data["suite_name"] = suite_name
+ self.data["suite_documentation"] = BuiltIn().get_variable_value(
+ "\\${SUITE_DOCUMENTATION}"
)
# "end_time" and "duration" are added on flush.
- self.data[u"hosts"] = set()
- self.data[u"telemetry"] = list()
+ self.data["hosts"] = set()
+ self.data["telemetry"] = list()
def start_test_export(self):
"""Set new file path, initialize data to minimal tree for the test case.
"""
self.warn_on_bad_export()
start_time = datetime.datetime.utcnow().strftime(
- u"%Y-%m-%dT%H:%M:%S.%fZ"
+ "%Y-%m-%dT%H:%M:%S.%fZ"
)
- suite_name = BuiltIn().get_variable_value(u"\\${SUITE_NAME}")
- suite_id = suite_name.lower().replace(u" ", u"_")
- suite_path_part = os.path.join(*suite_id.split(u"."))
- test_name = BuiltIn().get_variable_value(u"\\${TEST_NAME}")
+ suite_name = BuiltIn().get_variable_value("\\${SUITE_NAME}")
+ suite_id = suite_name.lower().replace(" ", "_")
+ suite_path_part = os.path.join(*suite_id.split("."))
+ test_name = BuiltIn().get_variable_value("\\${TEST_NAME}")
self.file_path = os.path.join(
self.output_dir, suite_path_part,
- test_name.lower().replace(u" ", u"_") + u".info.json"
+ test_name.lower().replace(" ", "_") + ".info.json"
)
self.data = dict()
- self.data[u"version"] = Constants.MODEL_VERSION
- self.data[u"start_time"] = start_time
- self.data[u"suite_name"] = suite_name
- self.data[u"test_name"] = test_name
- test_doc = BuiltIn().get_variable_value(u"\\${TEST_DOCUMENTATION}", u"")
- self.data[u"test_documentation"] = test_doc
+ self.data["version"] = Constants.MODEL_VERSION
+ self.data["start_time"] = start_time
+ self.data["suite_name"] = suite_name
+ self.data["test_name"] = test_name
+ test_doc = BuiltIn().get_variable_value("\\${TEST_DOCUMENTATION}", "")
+ self.data["test_documentation"] = test_doc
# "test_type" is added on flush.
# "tags" is detected and added on flush.
# "end_time" and "duration" is added on flush.
# Robot status and message are added on flush.
- self.data[u"result"] = dict(type=u"unknown")
- self.data[u"hosts"] = BuiltIn().get_variable_value(u"\\${hosts}")
- self.data[u"telemetry"] = list()
+ self.data["result"] = dict(type="unknown")
+ self.data["hosts"] = BuiltIn().get_variable_value("\\${hosts}")
+ self.data["telemetry"] = list()
export_dut_type_and_version()
export_tg_type_and_version()
"""
self.warn_on_bad_export()
start_time = datetime.datetime.utcnow().strftime(
- u"%Y-%m-%dT%H:%M:%S.%fZ"
+ "%Y-%m-%dT%H:%M:%S.%fZ"
)
- suite_name = BuiltIn().get_variable_value(u"\\${SUITE_NAME}")
- suite_id = suite_name.lower().replace(u" ", u"_")
- suite_path_part = os.path.join(*suite_id.split(u"."))
+ suite_name = BuiltIn().get_variable_value("\\${SUITE_NAME}")
+ suite_id = suite_name.lower().replace(" ", "_")
+ suite_path_part = os.path.join(*suite_id.split("."))
self.file_path = os.path.join(
- self.output_dir, suite_path_part, u"teardown.info.json"
+ self.output_dir, suite_path_part, "teardown.info.json"
)
self.data = dict()
- self.data[u"version"] = Constants.MODEL_VERSION
- self.data[u"start_time"] = start_time
- self.data[u"suite_name"] = suite_name
+ self.data["version"] = Constants.MODEL_VERSION
+ self.data["start_time"] = start_time
+ self.data["suite_name"] = suite_name
# "end_time" and "duration" is added on flush.
- self.data[u"hosts"] = BuiltIn().get_variable_value(u"\\${hosts}")
- self.data[u"telemetry"] = list()
+ self.data["hosts"] = BuiltIn().get_variable_value("\\${hosts}")
+ self.data["telemetry"] = list()
def finalize_suite_setup_export(self):
"""Add the missing fields to data. Do not write yet.
Should be run at the end of suite setup.
The write is done at next start (or at the end of global teardown).
"""
- end_time = datetime.datetime.utcnow().strftime(u"%Y-%m-%dT%H:%M:%S.%fZ")
- self.data[u"hosts"] = BuiltIn().get_variable_value(u"\\${hosts}")
- self.data[u"end_time"] = end_time
+ end_time = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+ self.data["hosts"] = BuiltIn().get_variable_value("\\${hosts}")
+ self.data["end_time"] = end_time
self.export_pending_data()
def finalize_test_export(self):
The write is done at next start (or at the end of global teardown).
"""
- end_time = datetime.datetime.utcnow().strftime(u"%Y-%m-%dT%H:%M:%S.%fZ")
- message = BuiltIn().get_variable_value(u"\\${TEST_MESSAGE}")
- test_tags = BuiltIn().get_variable_value(u"\\${TEST_TAGS}")
- self.data[u"end_time"] = end_time
- start_float = parse(self.data[u"start_time"]).timestamp()
- end_float = parse(self.data[u"end_time"]).timestamp()
- self.data[u"duration"] = end_float - start_float
- self.data[u"tags"] = list(test_tags)
- self.data[u"message"] = message
+ end_time = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+ message = BuiltIn().get_variable_value("\\${TEST_MESSAGE}")
+ test_tags = BuiltIn().get_variable_value("\\${TEST_TAGS}")
+ self.data["end_time"] = end_time
+ start_float = parse(self.data["start_time"]).timestamp()
+ end_float = parse(self.data["end_time"]).timestamp()
+ self.data["duration"] = end_float - start_float
+ self.data["tags"] = list(test_tags)
+ self.data["message"] = message
self.process_passed()
self.process_test_name()
self.process_results()
(but before the explicit write in the global suite teardown).
The write is done at next start (or explicitly for global teardown).
"""
- end_time = datetime.datetime.utcnow().strftime(u"%Y-%m-%dT%H:%M:%S.%fZ")
- self.data[u"end_time"] = end_time
+ end_time = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+ self.data["end_time"] = end_time
self.export_pending_data()
def process_test_name(self):
:raises RuntimeError: If the data does not contain expected values.
"""
- suite_part = self.data.pop(u"suite_name").lower().replace(u" ", u"_")
- if u"test_name" not in self.data:
+ suite_part = self.data.pop("suite_name").lower().replace(" ", "_")
+ if "test_name" not in self.data:
# There will be no test_id, provide suite_id instead.
- self.data[u"suite_id"] = suite_part
+ self.data["suite_id"] = suite_part
return
- test_part = self.data.pop(u"test_name").lower().replace(u" ", u"_")
- self.data[u"test_id"] = f"{suite_part}.{test_part}"
- tags = self.data[u"tags"]
+ test_part = self.data.pop("test_name").lower().replace(" ", "_")
+ self.data["test_id"] = f"{suite_part}.{test_part}"
+ tags = self.data["tags"]
# Test name does not contain thread count.
- subparts = test_part.split(u"c-", 1)
- if len(subparts) < 2 or subparts[0][-2:-1] != u"-":
+ subparts = test_part.split("c-", 1)
+ if len(subparts) < 2 or subparts[0][-2:-1] != "-":
# Physical core count not detected, assume it is a TRex test.
- if u"--" not in test_part:
+ if "--" not in test_part:
raise RuntimeError(f"Cores not found for {subparts}")
- short_name = test_part.split(u"--", 1)[1]
+ short_name = test_part.split("--", 1)[1]
else:
short_name = subparts[1]
# Add threads to test_part.
- core_part = subparts[0][-1] + u"c"
+ core_part = subparts[0][-1] + "c"
for tag in tags:
tag = tag.lower()
- if len(tag) == 4 and core_part == tag[2:] and tag[1] == u"t":
+ if len(tag) == 4 and core_part == tag[2:] and tag[1] == "t":
test_part = test_part.replace(f"-{core_part}-", f"-{tag}-")
break
else:
f"Threads not found for {test_part} tags {tags}"
)
# For long name we need NIC model, which is only in suite name.
- last_suite_part = suite_part.split(u".")[-1]
+ last_suite_part = suite_part.split(".")[-1]
# Short name happens to be the suffix we want to ignore.
prefix_part = last_suite_part.split(short_name)[0]
# Also remove the trailing dash.
prefix_part = prefix_part[:-1]
# Throw away possible link prefix such as "1n1l-".
- nic_code = prefix_part.split(u"-", 1)[-1]
+ nic_code = prefix_part.split("-", 1)[-1]
nic_short = Constants.NIC_CODE_TO_SHORT_NAME[nic_code]
long_name = f"{nic_short}-{test_part}"
# Set test type.
test_type = self._detect_test_type()
- self.data[u"test_type"] = test_type
+ self.data["test_type"] = test_type
# Remove trailing test type from names (if present).
short_name = short_name.split(f"-{test_type}")[0]
long_name = long_name.split(f"-{test_type}")[0]
# Store names.
- self.data[u"test_name_short"] = short_name
- self.data[u"test_name_long"] = long_name
+ self.data["test_name_short"] = short_name
+ self.data["test_name_long"] = long_name
def process_passed(self):
"""Process the test status information as boolean.
Boolean is used to make post processing more efficient.
In case the test status is PASS, we will truncate the test message.
"""
- status = BuiltIn().get_variable_value(u"\\${TEST_STATUS}")
+ status = BuiltIn().get_variable_value("\\${TEST_STATUS}")
if status is not None:
- self.data[u"passed"] = (status == u"PASS")
- if self.data[u"passed"]:
+ self.data["passed"] = (status == "PASS")
+ if self.data["passed"]:
# Also truncate success test messages.
- self.data[u"message"] = u""
+ self.data["message"] = ""
def process_results(self):
"""Process measured results.
telemetry_compress = compress(telemetry_encode, level=9)
telemetry_base64 = b2a_base64(telemetry_compress, newline=False)
self.data["telemetry"] = [telemetry_base64.decode()]
- if u"result" not in self.data:
+ if "result" not in self.data:
return
- result_node = self.data[u"result"]
- result_type = result_node[u"type"]
- if result_type == u"unknown":
+ result_node = self.data["result"]
+ result_type = result_node["type"]
+ if result_type == "unknown":
# Device or something else not supported.
return
- # Compute avg and stdev for mrr.
- if result_type == u"mrr":
- rate_node = result_node[u"receive_rate"][u"rate"]
- stats = AvgStdevStats.for_runs(rate_node[u"values"])
- rate_node[u"avg"] = stats.avg
- rate_node[u"stdev"] = stats.stdev
+ # Compute avg and stdev for mrr (rate and bandwidth).
+ if result_type == "mrr":
+ for node_name in ("rate", "bandwidth"):
+ node = result_node["receive_rate"].get(node_name, None)
+ if node is not None:
+ stats = AvgStdevStats.for_runs(node["values"])
+ node["avg"] = stats.avg
+ node["stdev"] = stats.stdev
return
# Multiple processing steps for ndrpdr.
- if result_type != u"ndrpdr":
+ if result_type != "ndrpdr":
return
# Filter out invalid latencies.
- for which_key in (u"latency_forward", u"latency_reverse"):
+ for which_key in ("latency_forward", "latency_reverse"):
if which_key not in result_node:
# Probably just an unidir test.
continue
- for load in (u"pdr_0", u"pdr_10", u"pdr_50", u"pdr_90"):
- if result_node[which_key][load][u"max"] <= 0:
+ for load in ("pdr_0", "pdr_10", "pdr_50", "pdr_90"):
+ if result_node[which_key][load]["max"] <= 0:
# One invalid number is enough to remove all loads.
break
else: