-# Copyright (c) 2022 Cisco and/or its affiliates.
+# Copyright (c) 2024 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
import datetime
import os.path
+from binascii import b2a_base64
from dateutil.parser import parse
from robot.api import logger
from robot.libraries.BuiltIn import BuiltIn
+from zlib import compress
from resources.libraries.python.Constants import Constants
-from resources.libraries.python.jumpavg.AvgStdevStats import AvgStdevStats
+from resources.libraries.python.jumpavg import AvgStdevStats
from resources.libraries.python.model.ExportResult import (
export_dut_type_and_version, export_tg_type_and_version
)
class ExportJson():
"""Class handling the json data setting and export."""
- ROBOT_LIBRARY_SCOPE = u"GLOBAL"
+ ROBOT_LIBRARY_SCOPE = "GLOBAL"
def __init__(self):
"""Declare required fields, cache output dir.
Also memorize schema validator instances.
"""
- self.output_dir = BuiltIn().get_variable_value(u"\\${OUTPUT_DIR}", ".")
+ self.output_dir = BuiltIn().get_variable_value("\\${OUTPUT_DIR}", ".")
self.file_path = None
self.data = None
self.validators = get_validators()
:rtype: str
:raises RuntimeError: If the test tags does not contain expected values.
"""
- tags = self.data[u"tags"]
+ tags = self.data["tags"]
# First 5 options are specific for VPP tests.
- if u"DEVICETEST" in tags:
- test_type = u"device"
- elif u"LDP_NGINX" in tags:
- test_type = u"vsap"
- elif u"HOSTSTACK" in tags:
- test_type = u"hoststack"
- elif u"GSO_TRUE" in tags or u"GSO_FALSE" in tags:
- test_type = u"gso"
- elif u"RECONF" in tags:
- test_type = u"reconf"
+ if "DEVICETEST" in tags:
+ test_type = "device"
+ elif "LDP_NGINX" in tags:
+ test_type = "hoststack"
+ elif "HOSTSTACK" in tags:
+ test_type = "hoststack"
+ elif "GSO_TRUE" in tags or "GSO_FALSE" in tags:
+ test_type = "mrr"
+ elif "RECONF" in tags:
+ test_type = "reconf"
# The remaining 3 options could also apply to DPDK and TRex tests.
- elif u"SOAK" in tags:
- test_type = u"soak"
- elif u"NDRPDR" in tags:
- test_type = u"ndrpdr"
- elif u"MRR" in tags:
- test_type = u"mrr"
+ elif "SOAK" in tags:
+ test_type = "soak"
+ elif "NDRPDR" in tags:
+ test_type = "ndrpdr"
+ elif "MRR" in tags:
+ test_type = "mrr"
else:
raise RuntimeError(f"Unable to infer test type from tags: {tags}")
return test_type
new_file_path = write_output(self.file_path, self.data)
# Data is going to be cleared (as a sign that export succeeded),
# so this is the last chance to detect if it was for a test case.
- is_testcase = u"result" in self.data
+ is_testcase = "result" in self.data
self.data = None
# Validation for output goes here when ready.
self.file_path = None
if is_testcase:
- validate(new_file_path, self.validators[u"tc_info"])
+ validate(new_file_path, self.validators["tc_info"])
def warn_on_bad_export(self):
"""If bad state is detected, log a warning and clean up state."""
"""
self.warn_on_bad_export()
start_time = datetime.datetime.utcnow().strftime(
- u"%Y-%m-%dT%H:%M:%S.%fZ"
+ "%Y-%m-%dT%H:%M:%S.%fZ"
)
- suite_name = BuiltIn().get_variable_value(u"\\${SUITE_NAME}")
- suite_id = suite_name.lower().replace(u" ", u"_")
- suite_path_part = os.path.join(*suite_id.split(u"."))
+ suite_name = BuiltIn().get_variable_value("\\${SUITE_NAME}")
+ suite_id = suite_name.lower().replace(" ", "_")
+ suite_path_part = os.path.join(*suite_id.split("."))
output_dir = self.output_dir
self.file_path = os.path.join(
- output_dir, suite_path_part, u"setup.info.json"
+ output_dir, suite_path_part, "setup.info.json"
)
self.data = dict()
- self.data[u"version"] = Constants.MODEL_VERSION
- self.data[u"start_time"] = start_time
- self.data[u"suite_name"] = suite_name
- self.data[u"suite_documentation"] = BuiltIn().get_variable_value(
- u"\\${SUITE_DOCUMENTATION}"
+ self.data["version"] = Constants.MODEL_VERSION
+ self.data["start_time"] = start_time
+ self.data["suite_name"] = suite_name
+ self.data["suite_documentation"] = BuiltIn().get_variable_value(
+ "\\${SUITE_DOCUMENTATION}"
)
# "end_time" and "duration" are added on flush.
- self.data[u"hosts"] = set()
- self.data[u"telemetry"] = list()
+ self.data["hosts"] = set()
+ self.data["telemetry"] = list()
def start_test_export(self):
"""Set new file path, initialize data to minimal tree for the test case.
"""
self.warn_on_bad_export()
start_time = datetime.datetime.utcnow().strftime(
- u"%Y-%m-%dT%H:%M:%S.%fZ"
+ "%Y-%m-%dT%H:%M:%S.%fZ"
)
- suite_name = BuiltIn().get_variable_value(u"\\${SUITE_NAME}")
- suite_id = suite_name.lower().replace(u" ", u"_")
- suite_path_part = os.path.join(*suite_id.split(u"."))
- test_name = BuiltIn().get_variable_value(u"\\${TEST_NAME}")
+ suite_name = BuiltIn().get_variable_value("\\${SUITE_NAME}")
+ suite_id = suite_name.lower().replace(" ", "_")
+ suite_path_part = os.path.join(*suite_id.split("."))
+ test_name = BuiltIn().get_variable_value("\\${TEST_NAME}")
self.file_path = os.path.join(
self.output_dir, suite_path_part,
- test_name.lower().replace(u" ", u"_") + u".info.json"
+ test_name.lower().replace(" ", "_") + ".info.json"
)
self.data = dict()
- self.data[u"version"] = Constants.MODEL_VERSION
- self.data[u"start_time"] = start_time
- self.data[u"suite_name"] = suite_name
- self.data[u"test_name"] = test_name
- test_doc = BuiltIn().get_variable_value(u"\\${TEST_DOCUMENTATION}", u"")
- self.data[u"test_documentation"] = test_doc
+ self.data["version"] = Constants.MODEL_VERSION
+ self.data["start_time"] = start_time
+ self.data["suite_name"] = suite_name
+ self.data["test_name"] = test_name
+ test_doc = BuiltIn().get_variable_value("\\${TEST_DOCUMENTATION}", "")
+ self.data["test_documentation"] = test_doc
# "test_type" is added on flush.
# "tags" is detected and added on flush.
# "end_time" and "duration" is added on flush.
# Robot status and message are added on flush.
- self.data[u"result"] = dict(type=u"unknown")
- self.data[u"hosts"] = BuiltIn().get_variable_value(u"\\${hosts}")
- self.data[u"telemetry"] = list()
+ self.data["result"] = dict(type="unknown")
+ self.data["hosts"] = BuiltIn().get_variable_value("\\${hosts}")
+ self.data["telemetry"] = list()
export_dut_type_and_version()
export_tg_type_and_version()
"""
self.warn_on_bad_export()
start_time = datetime.datetime.utcnow().strftime(
- u"%Y-%m-%dT%H:%M:%S.%fZ"
+ "%Y-%m-%dT%H:%M:%S.%fZ"
)
- suite_name = BuiltIn().get_variable_value(u"\\${SUITE_NAME}")
- suite_id = suite_name.lower().replace(u" ", u"_")
- suite_path_part = os.path.join(*suite_id.split(u"."))
+ suite_name = BuiltIn().get_variable_value("\\${SUITE_NAME}")
+ suite_id = suite_name.lower().replace(" ", "_")
+ suite_path_part = os.path.join(*suite_id.split("."))
self.file_path = os.path.join(
- self.output_dir, suite_path_part, u"teardown.info.json"
+ self.output_dir, suite_path_part, "teardown.info.json"
)
self.data = dict()
- self.data[u"version"] = Constants.MODEL_VERSION
- self.data[u"start_time"] = start_time
- self.data[u"suite_name"] = suite_name
+ self.data["version"] = Constants.MODEL_VERSION
+ self.data["start_time"] = start_time
+ self.data["suite_name"] = suite_name
# "end_time" and "duration" is added on flush.
- self.data[u"hosts"] = BuiltIn().get_variable_value(u"\\${hosts}")
- self.data[u"telemetry"] = list()
+ self.data["hosts"] = BuiltIn().get_variable_value("\\${hosts}")
+ self.data["telemetry"] = list()
def finalize_suite_setup_export(self):
"""Add the missing fields to data. Do not write yet.
Should be run at the end of suite setup.
The write is done at next start (or at the end of global teardown).
"""
- end_time = datetime.datetime.utcnow().strftime(u"%Y-%m-%dT%H:%M:%S.%fZ")
- self.data[u"hosts"] = BuiltIn().get_variable_value(u"\\${hosts}")
- self.data[u"end_time"] = end_time
+ end_time = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+ self.data["hosts"] = BuiltIn().get_variable_value("\\${hosts}")
+ self.data["end_time"] = end_time
self.export_pending_data()
def finalize_test_export(self):
The write is done at next start (or at the end of global teardown).
"""
- end_time = datetime.datetime.utcnow().strftime(u"%Y-%m-%dT%H:%M:%S.%fZ")
- message = BuiltIn().get_variable_value(u"\\${TEST_MESSAGE}")
- test_tags = BuiltIn().get_variable_value(u"\\${TEST_TAGS}")
- self.data[u"end_time"] = end_time
- start_float = parse(self.data[u"start_time"]).timestamp()
- end_float = parse(self.data[u"end_time"]).timestamp()
- self.data[u"duration"] = end_float - start_float
- self.data[u"tags"] = list(test_tags)
- self.data[u"message"] = message
+ end_time = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+ message = BuiltIn().get_variable_value("\\${TEST_MESSAGE}")
+ test_tags = BuiltIn().get_variable_value("\\${TEST_TAGS}")
+ self.data["end_time"] = end_time
+ start_float = parse(self.data["start_time"]).timestamp()
+ end_float = parse(self.data["end_time"]).timestamp()
+ self.data["duration"] = end_float - start_float
+ self.data["tags"] = list(test_tags)
+ self.data["message"] = message
self.process_passed()
self.process_test_name()
self.process_results()
(but before the explicit write in the global suite teardown).
The write is done at next start (or explicitly for global teardown).
"""
- end_time = datetime.datetime.utcnow().strftime(u"%Y-%m-%dT%H:%M:%S.%fZ")
- self.data[u"end_time"] = end_time
+ end_time = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+ self.data["end_time"] = end_time
self.export_pending_data()
def process_test_name(self):
:raises RuntimeError: If the data does not contain expected values.
"""
- suite_part = self.data.pop(u"suite_name").lower().replace(u" ", u"_")
- if u"test_name" not in self.data:
+ suite_part = self.data.pop("suite_name").lower().replace(" ", "_")
+ if "test_name" not in self.data:
# There will be no test_id, provide suite_id instead.
- self.data[u"suite_id"] = suite_part
+ self.data["suite_id"] = suite_part
return
- test_part = self.data.pop(u"test_name").lower().replace(u" ", u"_")
- self.data[u"test_id"] = f"{suite_part}.{test_part}"
- tags = self.data[u"tags"]
+ test_part = self.data.pop("test_name").lower().replace(" ", "_")
+ self.data["test_id"] = f"{suite_part}.{test_part}"
+ tags = self.data["tags"]
# Test name does not contain thread count.
- subparts = test_part.split(u"c-", 1)
- if len(subparts) < 2 or subparts[0][-2:-1] != u"-":
+ subparts = test_part.split("-")
+ if any("tg" in s for s in subparts) and subparts[1] == "":
# Physical core count not detected, assume it is a TRex test.
- if u"--" not in test_part:
- raise RuntimeError(f"Cores not found for {subparts}")
- short_name = test_part.split(u"--", 1)[1]
+ if "--" not in test_part:
+ raise RuntimeError(f"Invalid TG test name for: {subparts}")
+ short_name = test_part.split("--", 1)[1]
else:
- short_name = subparts[1]
+ short_name = "-".join(subparts[2:])
# Add threads to test_part.
- core_part = subparts[0][-1] + u"c"
- for tag in tags:
- tag = tag.lower()
- if len(tag) == 4 and core_part == tag[2:] and tag[1] == u"t":
- test_part = test_part.replace(f"-{core_part}-", f"-{tag}-")
- break
- else:
- raise RuntimeError(
- f"Threads not found for {test_part} tags {tags}"
- )
+ core_part = subparts[1]
+ tag = list(filter(lambda t: subparts[1].upper() in t, tags))[0]
+ test_part = test_part.replace(f"-{core_part}-", f"-{tag.lower()}-")
# For long name we need NIC model, which is only in suite name.
- last_suite_part = suite_part.split(u".")[-1]
+ last_suite_part = suite_part.split(".")[-1]
# Short name happens to be the suffix we want to ignore.
prefix_part = last_suite_part.split(short_name)[0]
# Also remove the trailing dash.
prefix_part = prefix_part[:-1]
# Throw away possible link prefix such as "1n1l-".
- nic_code = prefix_part.split(u"-", 1)[-1]
+ nic_code = prefix_part.split("-", 1)[-1]
nic_short = Constants.NIC_CODE_TO_SHORT_NAME[nic_code]
long_name = f"{nic_short}-{test_part}"
# Set test type.
test_type = self._detect_test_type()
- self.data[u"test_type"] = test_type
+ self.data["test_type"] = test_type
# Remove trailing test type from names (if present).
short_name = short_name.split(f"-{test_type}")[0]
long_name = long_name.split(f"-{test_type}")[0]
# Store names.
- self.data[u"test_name_short"] = short_name
- self.data[u"test_name_long"] = long_name
+ self.data["test_name_short"] = short_name
+ self.data["test_name_long"] = long_name
def process_passed(self):
"""Process the test status information as boolean.
Boolean is used to make post processing more efficient.
In case the test status is PASS, we will truncate the test message.
"""
- status = BuiltIn().get_variable_value(u"\\${TEST_STATUS}")
+ status = BuiltIn().get_variable_value("\\${TEST_STATUS}")
if status is not None:
- self.data[u"passed"] = (status == u"PASS")
- if self.data[u"passed"]:
+ self.data["passed"] = (status == "PASS")
+ if self.data["passed"]:
# Also truncate success test messages.
- self.data[u"message"] = u""
+ self.data["message"] = ""
def process_results(self):
"""Process measured results.
Results are used to avoid future post processing, making it more
efficient to consume.
"""
- if u"result" not in self.data:
+ if self.data["telemetry"]:
+ telemetry_encode = "\n".join(self.data["telemetry"]).encode()
+ telemetry_compress = compress(telemetry_encode, level=9)
+ telemetry_base64 = b2a_base64(telemetry_compress, newline=False)
+ self.data["telemetry"] = [telemetry_base64.decode()]
+ if "result" not in self.data:
return
- result_node = self.data[u"result"]
- result_type = result_node[u"type"]
- if result_type == u"unknown":
+ result_node = self.data["result"]
+ result_type = result_node["type"]
+ if result_type == "unknown":
# Device or something else not supported.
return
- # Compute avg and stdev for mrr.
- if result_type == u"mrr":
- rate_node = result_node[u"receive_rate"][u"rate"]
- stats = AvgStdevStats.for_runs(rate_node[u"values"])
- rate_node[u"avg"] = stats.avg
- rate_node[u"stdev"] = stats.stdev
+ # Compute avg and stdev for mrr (rate and bandwidth).
+ if result_type == "mrr":
+ for node_name in ("rate", "bandwidth"):
+ node = result_node["receive_rate"].get(node_name, None)
+ if node is not None:
+ stats = AvgStdevStats.for_runs(node["values"])
+ node["avg"] = stats.avg
+ node["stdev"] = stats.stdev
return
# Multiple processing steps for ndrpdr.
- if result_type != u"ndrpdr":
+ if result_type != "ndrpdr":
return
# Filter out invalid latencies.
- for which_key in (u"latency_forward", u"latency_reverse"):
+ for which_key in ("latency_forward", "latency_reverse"):
if which_key not in result_node:
# Probably just an unidir test.
continue
- for load in (u"pdr_0", u"pdr_10", u"pdr_50", u"pdr_90"):
- if result_node[which_key][load][u"max"] <= 0:
+ for load in ("pdr_0", "pdr_10", "pdr_50", "pdr_90"):
+ if result_node[which_key][load]["max"] <= 0:
# One invalid number is enough to remove all loads.
break
else: