1 # Copyright (c) 2024 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Module tracking json in-memory data and saving it to files.
16 Each test case, suite setup (hierarchical) and teardown has its own file pair.
18 Validation is performed for output files with available JSON schema.
19 Validation is performed in data deserialized from disk,
20 as serialization might have introduced subtle errors.
26 from binascii import b2a_base64
27 from dateutil.parser import parse
28 from robot.api import logger
29 from robot.libraries.BuiltIn import BuiltIn
30 from zlib import compress
32 from resources.libraries.python.Constants import Constants
33 from resources.libraries.python.jumpavg import AvgStdevStats
34 from resources.libraries.python.model.ExportResult import (
35 export_dut_type_and_version, export_tg_type_and_version
37 from resources.libraries.python.model.MemDump import write_output
38 from resources.libraries.python.model.validate import (
39 get_validators, validate
44 """Class handling the json data setting and export."""
46 ROBOT_LIBRARY_SCOPE = "GLOBAL"
49 """Declare required fields, cache output dir.
51 Also memorize schema validator instances.
53 self.output_dir = BuiltIn().get_variable_value("\\${OUTPUT_DIR}", ".")
56 self.validators = get_validators()
58 def _detect_test_type(self):
59 """Return test_type, as inferred from robot test tags.
61 :returns: The inferred test type value.
63 :raises RuntimeError: If the test tags does not contain expected values.
65 tags = self.data["tags"]
66 # First 5 options are specific for VPP tests.
67 if "DEVICETEST" in tags:
69 elif "LDP_NGINX" in tags:
70 test_type = "hoststack"
71 elif "HOSTSTACK" in tags:
72 test_type = "hoststack"
73 elif "GSO_TRUE" in tags or "GSO_FALSE" in tags:
75 elif "RECONF" in tags:
77 # The remaining 3 options could also apply to DPDK and TRex tests.
80 elif "NDRPDR" in tags:
85 raise RuntimeError(f"Unable to infer test type from tags: {tags}")
88 def export_pending_data(self):
89 """Write the accumulated data to disk.
91 Create missing directories.
92 Reset both file path and data to avoid writing multiple times.
94 Functions which finalize content for given file are calling this,
95 so make sure each test and non-empty suite setup or teardown
96 is calling this as their last keyword.
98 If no file path is set, do not write anything,
99 as that is the failsafe behavior when caller from unexpected place.
100 Aso do not write anything when EXPORT_JSON constant is false.
102 Regardless of whether data was written, it is cleared.
104 if not Constants.EXPORT_JSON or not self.file_path:
106 self.file_path = None
108 new_file_path = write_output(self.file_path, self.data)
109 # Data is going to be cleared (as a sign that export succeeded),
110 # so this is the last chance to detect if it was for a test case.
111 is_testcase = "result" in self.data
113 # Validation for output goes here when ready.
114 self.file_path = None
116 validate(new_file_path, self.validators["tc_info"])
118 def warn_on_bad_export(self):
119 """If bad state is detected, log a warning and clean up state."""
120 if self.file_path is not None or self.data is not None:
121 logger.warn(f"Previous export not clean, path {self.file_path}")
123 self.file_path = None
125 def start_suite_setup_export(self):
126 """Set new file path, initialize data for the suite setup.
128 This has to be called explicitly at start of suite setup,
129 otherwise Robot likes to postpone initialization
130 until first call by a data-adding keyword.
132 File path is set based on suite.
134 self.warn_on_bad_export()
135 start_time = datetime.datetime.utcnow().strftime(
136 "%Y-%m-%dT%H:%M:%S.%fZ"
138 suite_name = BuiltIn().get_variable_value("\\${SUITE_NAME}")
139 suite_id = suite_name.lower().replace(" ", "_")
140 suite_path_part = os.path.join(*suite_id.split("."))
141 output_dir = self.output_dir
142 self.file_path = os.path.join(
143 output_dir, suite_path_part, "setup.info.json"
146 self.data["version"] = Constants.MODEL_VERSION
147 self.data["start_time"] = start_time
148 self.data["suite_name"] = suite_name
149 self.data["suite_documentation"] = BuiltIn().get_variable_value(
150 "\\${SUITE_DOCUMENTATION}"
152 # "end_time" and "duration" are added on flush.
153 self.data["hosts"] = set()
154 self.data["telemetry"] = list()
156 def start_test_export(self):
157 """Set new file path, initialize data to minimal tree for the test case.
159 It is assumed Robot variables DUT_TYPE and DUT_VERSION
160 are already set (in suite setup) to correct values.
162 This function has to be called explicitly at the start of test setup,
163 otherwise Robot likes to postpone initialization
164 until first call by a data-adding keyword.
166 File path is set based on suite and test.
168 self.warn_on_bad_export()
169 start_time = datetime.datetime.utcnow().strftime(
170 "%Y-%m-%dT%H:%M:%S.%fZ"
172 suite_name = BuiltIn().get_variable_value("\\${SUITE_NAME}")
173 suite_id = suite_name.lower().replace(" ", "_")
174 suite_path_part = os.path.join(*suite_id.split("."))
175 test_name = BuiltIn().get_variable_value("\\${TEST_NAME}")
176 self.file_path = os.path.join(
177 self.output_dir, suite_path_part,
178 test_name.lower().replace(" ", "_") + ".info.json"
181 self.data["version"] = Constants.MODEL_VERSION
182 self.data["start_time"] = start_time
183 self.data["suite_name"] = suite_name
184 self.data["test_name"] = test_name
185 test_doc = BuiltIn().get_variable_value("\\${TEST_DOCUMENTATION}", "")
186 self.data["test_documentation"] = test_doc
187 # "test_type" is added on flush.
188 # "tags" is detected and added on flush.
189 # "end_time" and "duration" is added on flush.
190 # Robot status and message are added on flush.
191 self.data["result"] = dict(type="unknown")
192 self.data["hosts"] = BuiltIn().get_variable_value("\\${hosts}")
193 self.data["telemetry"] = list()
194 export_dut_type_and_version()
195 export_tg_type_and_version()
197 def start_suite_teardown_export(self):
198 """Set new file path, initialize data for the suite teardown.
200 This has to be called explicitly at start of suite teardown,
201 otherwise Robot likes to postpone initialization
202 until first call by a data-adding keyword.
204 File path is set based on suite.
206 self.warn_on_bad_export()
207 start_time = datetime.datetime.utcnow().strftime(
208 "%Y-%m-%dT%H:%M:%S.%fZ"
210 suite_name = BuiltIn().get_variable_value("\\${SUITE_NAME}")
211 suite_id = suite_name.lower().replace(" ", "_")
212 suite_path_part = os.path.join(*suite_id.split("."))
213 self.file_path = os.path.join(
214 self.output_dir, suite_path_part, "teardown.info.json"
217 self.data["version"] = Constants.MODEL_VERSION
218 self.data["start_time"] = start_time
219 self.data["suite_name"] = suite_name
220 # "end_time" and "duration" is added on flush.
221 self.data["hosts"] = BuiltIn().get_variable_value("\\${hosts}")
222 self.data["telemetry"] = list()
224 def finalize_suite_setup_export(self):
225 """Add the missing fields to data. Do not write yet.
227 Should be run at the end of suite setup.
228 The write is done at next start (or at the end of global teardown).
230 end_time = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
231 self.data["hosts"] = BuiltIn().get_variable_value("\\${hosts}")
232 self.data["end_time"] = end_time
233 self.export_pending_data()
235 def finalize_test_export(self):
236 """Add the missing fields to data. Do not write yet.
238 Should be at the end of test teardown, as the implementation
239 reads various Robot variables, some of them only available at teardown.
241 The write is done at next start (or at the end of global teardown).
243 end_time = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
244 message = BuiltIn().get_variable_value("\\${TEST_MESSAGE}")
245 test_tags = BuiltIn().get_variable_value("\\${TEST_TAGS}")
246 self.data["end_time"] = end_time
247 start_float = parse(self.data["start_time"]).timestamp()
248 end_float = parse(self.data["end_time"]).timestamp()
249 self.data["duration"] = end_float - start_float
250 self.data["tags"] = list(test_tags)
251 self.data["message"] = message
252 self.process_passed()
253 self.process_test_name()
254 self.process_results()
255 self.export_pending_data()
257 def finalize_suite_teardown_export(self):
258 """Add the missing fields to data. Do not write yet.
260 Should be run at the end of suite teardown
261 (but before the explicit write in the global suite teardown).
262 The write is done at next start (or explicitly for global teardown).
264 end_time = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
265 self.data["end_time"] = end_time
266 self.export_pending_data()
268 def process_test_name(self):
269 """Replace raw test name with short and long test name and set
272 Perform in-place edits on the data dictionary.
273 Remove raw suite_name and test_name, they are not published.
274 Return early if the data is not for test case.
275 Insert test ID and long and short test name into the data.
276 Besides suite_name and test_name, also test tags are read.
278 Short test name is basically a suite tag, but with NIC driver prefix,
279 if the NIC driver used is not the default one (drv_vfio_pci for VPP
282 Long test name has the following form:
283 {nic_short_name}-{frame_size}-{threads_and_cores}-{suite_part}
284 Lookup in test tags is needed to get the threads value.
285 The threads_and_cores part may be empty, e.g. for TRex tests.
287 Test ID has form {suite_name}.{test_name} where the two names come from
288 Robot variables, converted to lower case and spaces replaces by
291 Test type is set in an internal function.
293 :raises RuntimeError: If the data does not contain expected values.
295 suite_part = self.data.pop("suite_name").lower().replace(" ", "_")
296 if "test_name" not in self.data:
297 # There will be no test_id, provide suite_id instead.
298 self.data["suite_id"] = suite_part
300 test_part = self.data.pop("test_name").lower().replace(" ", "_")
301 self.data["test_id"] = f"{suite_part}.{test_part}"
302 tags = self.data["tags"]
303 # Test name does not contain thread count.
304 subparts = test_part.split("-")
305 if any("tg" in s for s in subparts) and subparts[1] == "":
306 # Physical core count not detected, assume it is a TRex test.
307 if "--" not in test_part:
308 raise RuntimeError(f"Invalid TG test name for: {subparts}")
309 short_name = test_part.split("--", 1)[1]
311 short_name = "-".join(subparts[2:])
312 # Add threads to test_part.
313 core_part = subparts[1]
314 tag = list(filter(lambda t: subparts[1].upper() in t, tags))[0]
315 test_part = test_part.replace(f"-{core_part}-", f"-{tag.lower()}-")
316 # For long name we need NIC model, which is only in suite name.
317 last_suite_part = suite_part.split(".")[-1]
318 # Short name happens to be the suffix we want to ignore.
319 prefix_part = last_suite_part.split(short_name)[0]
320 # Also remove the trailing dash.
321 prefix_part = prefix_part[:-1]
322 # Throw away possible link prefix such as "1n1l-".
323 nic_code = prefix_part.split("-", 1)[-1]
324 nic_short = Constants.NIC_CODE_TO_SHORT_NAME[nic_code]
325 long_name = f"{nic_short}-{test_part}"
327 test_type = self._detect_test_type()
328 self.data["test_type"] = test_type
329 # Remove trailing test type from names (if present).
330 short_name = short_name.split(f"-{test_type}")[0]
331 long_name = long_name.split(f"-{test_type}")[0]
333 self.data["test_name_short"] = short_name
334 self.data["test_name_long"] = long_name
336 def process_passed(self):
337 """Process the test status information as boolean.
339 Boolean is used to make post processing more efficient.
340 In case the test status is PASS, we will truncate the test message.
342 status = BuiltIn().get_variable_value("\\${TEST_STATUS}")
343 if status is not None:
344 self.data["passed"] = (status == "PASS")
345 if self.data["passed"]:
346 # Also truncate success test messages.
347 self.data["message"] = ""
349 def process_results(self):
350 """Process measured results.
352 Results are used to avoid future post processing, making it more
353 efficient to consume.
355 if self.data["telemetry"]:
356 telemetry_encode = "\n".join(self.data["telemetry"]).encode()
357 telemetry_compress = compress(telemetry_encode, level=9)
358 telemetry_base64 = b2a_base64(telemetry_compress, newline=False)
359 self.data["telemetry"] = [telemetry_base64.decode()]
360 if "result" not in self.data:
362 result_node = self.data["result"]
363 result_type = result_node["type"]
364 if result_type == "unknown":
365 # Device or something else not supported.
368 # Compute avg and stdev for mrr (rate and bandwidth).
369 if result_type == "mrr":
370 for node_name in ("rate", "bandwidth"):
371 node = result_node["receive_rate"].get(node_name, None)
373 stats = AvgStdevStats.for_runs(node["values"])
374 node["avg"] = stats.avg
375 node["stdev"] = stats.stdev
378 # Multiple processing steps for ndrpdr.
379 if result_type != "ndrpdr":
381 # Filter out invalid latencies.
382 for which_key in ("latency_forward", "latency_reverse"):
383 if which_key not in result_node:
384 # Probably just an unidir test.
386 for load in ("pdr_0", "pdr_10", "pdr_50", "pdr_90"):
387 if result_node[which_key][load]["max"] <= 0:
388 # One invalid number is enough to remove all loads.
391 # No break means all numbers are ok, nothing to do here.
393 # Break happened, something is invalid, remove all loads.
394 result_node.pop(which_key)