1 # Copyright (c) 2022 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Data pre-processing
16 - extract data from output.xml files generated by Jenkins jobs and store in
18 - provide access to the data.
19 - filter the data using tags,
27 from collections import OrderedDict
28 from os import remove, walk, listdir
29 from os.path import isfile, isdir, join
30 from datetime import datetime as dt
31 from datetime import timedelta
32 from json import loads
33 from json.decoder import JSONDecodeError
40 from robot.api import ExecutionResult, ResultVisitor
41 from robot import errors
43 from resources.libraries.python import jumpavg
44 from input_data_files import download_and_unzip_data_file
45 from pal_errors import PresentationError
48 # Separator used in file names
52 class ExecutionChecker(ResultVisitor):
53 """Class to traverse through the test suite structure.
56 REGEX_PLR_RATE = re.compile(
57 r'PLRsearch lower bound::?\s(\d+.\d+).*\n'
58 r'PLRsearch upper bound::?\s(\d+.\d+)'
60 REGEX_NDRPDR_RATE = re.compile(
61 r'NDR_LOWER:\s(\d+.\d+).*\n.*\n'
62 r'NDR_UPPER:\s(\d+.\d+).*\n'
63 r'PDR_LOWER:\s(\d+.\d+).*\n.*\n'
64 r'PDR_UPPER:\s(\d+.\d+)'
66 REGEX_NDRPDR_GBPS = re.compile(
67 r'NDR_LOWER:.*,\s(\d+.\d+).*\n.*\n'
68 r'NDR_UPPER:.*,\s(\d+.\d+).*\n'
69 r'PDR_LOWER:.*,\s(\d+.\d+).*\n.*\n'
70 r'PDR_UPPER:.*,\s(\d+.\d+)'
72 REGEX_PERF_MSG_INFO = re.compile(
73 r'NDR_LOWER:\s(\d+.\d+)\s.*\s(\d+.\d+)\s.*\n.*\n.*\n'
74 r'PDR_LOWER:\s(\d+.\d+)\s.*\s(\d+.\d+)\s.*\n.*\n.*\n'
75 r'Latency at 90% PDR:.*\[\'(.*)\', \'(.*)\'\].*\n'
76 r'Latency at 50% PDR:.*\[\'(.*)\', \'(.*)\'\].*\n'
77 r'Latency at 10% PDR:.*\[\'(.*)\', \'(.*)\'\].*\n'
79 REGEX_CPS_MSG_INFO = re.compile(
80 r'NDR_LOWER:\s(\d+.\d+)\s.*\s.*\n.*\n.*\n'
81 r'PDR_LOWER:\s(\d+.\d+)\s.*\s.*\n.*\n.*'
83 REGEX_PPS_MSG_INFO = re.compile(
84 r'NDR_LOWER:\s(\d+.\d+)\s.*\s(\d+.\d+)\s.*\n.*\n.*\n'
85 r'PDR_LOWER:\s(\d+.\d+)\s.*\s(\d+.\d+)\s.*\n.*\n.*'
87 REGEX_MRR_MSG_INFO = re.compile(r'.*\[(.*)\]')
89 REGEX_VSAP_MSG_INFO = re.compile(
90 r'Transfer Rate: (\d*.\d*).*\n'
91 r'Latency: (\d*.\d*).*\n'
92 r'Completed requests: (\d*).*\n'
93 r'Failed requests: (\d*).*\n'
94 r'Total data transferred: (\d*).*\n'
95 r'Connection [cr]ps rate:\s*(\d*.\d*)'
98 # Needed for CPS and PPS tests
99 REGEX_NDRPDR_LAT_BASE = re.compile(
100 r'LATENCY.*\[\'(.*)\', \'(.*)\'\]\s\n.*\n.*\n'
101 r'LATENCY.*\[\'(.*)\', \'(.*)\'\]'
103 REGEX_NDRPDR_LAT = re.compile(
104 r'LATENCY.*\[\'(.*)\', \'(.*)\'\]\s\n.*\n.*\n'
105 r'LATENCY.*\[\'(.*)\', \'(.*)\'\]\s\n.*\n'
106 r'Latency.*\[\'(.*)\', \'(.*)\'\]\s\n'
107 r'Latency.*\[\'(.*)\', \'(.*)\'\]\s\n'
108 r'Latency.*\[\'(.*)\', \'(.*)\'\]\s\n'
109 r'Latency.*\[\'(.*)\', \'(.*)\'\]'
112 REGEX_VERSION_VPP = re.compile(
113 r"(VPP Version:\s*|VPP version:\s*)(.*)"
115 REGEX_VERSION_DPDK = re.compile(
116 r"(DPDK version:\s*|DPDK Version:\s*)(.*)"
118 REGEX_TCP = re.compile(
119 r'Total\s(rps|cps|throughput):\s(\d*).*$'
121 REGEX_MRR = re.compile(
122 r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
123 r'tx\s(\d*),\srx\s(\d*)'
125 REGEX_BMRR = re.compile(
126 r'.*trial results.*: \[(.*)\]'
128 REGEX_RECONF_LOSS = re.compile(
129 r'Packets lost due to reconfig: (\d*)'
131 REGEX_RECONF_TIME = re.compile(
132 r'Implied time lost: (\d*.[\de-]*)'
134 REGEX_TC_TAG = re.compile(r'\d+[tT]\d+[cC]')
136 REGEX_TC_NAME_NEW = re.compile(r'-\d+[cC]-')
138 REGEX_TC_NUMBER = re.compile(r'tc\d{2}-')
140 REGEX_TC_PAPI_CLI = re.compile(r'.*\((\d+.\d+.\d+.\d+.) - (.*)\)')
142 REGEX_SH_RUN_HOST = re.compile(
143 r'hostname=\"(\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})\",hook=\"(.*)\"'
146 def __init__(self, metadata, mapping, ignore, process_oper):
149 :param metadata: Key-value pairs to be included in "metadata" part of
151 :param mapping: Mapping of the old names of test cases to the new
153 :param ignore: List of TCs to be ignored.
154 :param process_oper: If True, operational data (show run, telemetry) is
159 :type process_oper: bool
162 # Mapping of TCs long names
163 self._mapping = mapping
166 self._ignore = ignore
168 # Process operational data
169 self._process_oper = process_oper
171 # Name of currently processed keyword
178 self._timestamp = None
180 # Testbed. The testbed is identified by TG node IP address.
183 # Number of PAPI History messages found:
185 # 1 - PAPI History of DUT1
186 # 2 - PAPI History of DUT2
187 self._conf_history_lookup_nr = 0
189 self._sh_run_counter = 0
190 self._telemetry_kw_counter = 0
191 self._telemetry_msg_counter = 0
193 # Test ID of currently processed test- the lowercase full path to the
197 # The main data structure
204 # Save the provided metadata
205 for key, val in metadata.items():
206 self._data["metadata"][key] = val
210 """Getter - Data parsed from the XML file.
212 :returns: Data parsed from the XML file.
217 def _get_data_from_mrr_test_msg(self, msg):
218 """Get info from message of MRR performance tests.
220 :param msg: Message to be processed.
222 :returns: Processed message or original message if a problem occurs.
226 groups = re.search(self.REGEX_MRR_MSG_INFO, msg)
227 if not groups or groups.lastindex != 1:
228 return "Test Failed."
231 data = groups.group(1).split(", ")
232 except (AttributeError, IndexError, ValueError, KeyError):
233 return "Test Failed."
238 out_str += f"{(float(item) / 1e6):.2f}, "
239 return out_str[:-2] + "]"
240 except (AttributeError, IndexError, ValueError, KeyError):
241 return "Test Failed."
243 def _get_data_from_cps_test_msg(self, msg):
244 """Get info from message of NDRPDR CPS tests.
246 :param msg: Message to be processed.
248 :returns: Processed message or "Test Failed." if a problem occurs.
252 groups = re.search(self.REGEX_CPS_MSG_INFO, msg)
253 if not groups or groups.lastindex != 2:
254 return "Test Failed."
258 f"1. {(float(groups.group(1)) / 1e6):5.2f}\n"
259 f"2. {(float(groups.group(2)) / 1e6):5.2f}"
261 except (AttributeError, IndexError, ValueError, KeyError):
262 return "Test Failed."
264 def _get_data_from_pps_test_msg(self, msg):
265 """Get info from message of NDRPDR PPS tests.
267 :param msg: Message to be processed.
269 :returns: Processed message or "Test Failed." if a problem occurs.
273 groups = re.search(self.REGEX_PPS_MSG_INFO, msg)
274 if not groups or groups.lastindex != 4:
275 return "Test Failed."
279 f"1. {(float(groups.group(1)) / 1e6):5.2f} "
280 f"{float(groups.group(2)):5.2f}\n"
281 f"2. {(float(groups.group(3)) / 1e6):5.2f} "
282 f"{float(groups.group(4)):5.2f}"
284 except (AttributeError, IndexError, ValueError, KeyError):
285 return "Test Failed."
287 def _get_data_from_perf_test_msg(self, msg):
288 """Get info from message of NDRPDR performance tests.
290 :param msg: Message to be processed.
292 :returns: Processed message or "Test Failed." if a problem occurs.
296 groups = re.search(self.REGEX_PERF_MSG_INFO, msg)
297 if not groups or groups.lastindex != 10:
298 return "Test Failed."
302 "ndr_low": float(groups.group(1)),
303 "ndr_low_b": float(groups.group(2)),
304 "pdr_low": float(groups.group(3)),
305 "pdr_low_b": float(groups.group(4)),
306 "pdr_lat_90_1": groups.group(5),
307 "pdr_lat_90_2": groups.group(6),
308 "pdr_lat_50_1": groups.group(7),
309 "pdr_lat_50_2": groups.group(8),
310 "pdr_lat_10_1": groups.group(9),
311 "pdr_lat_10_2": groups.group(10),
313 except (AttributeError, IndexError, ValueError, KeyError):
314 return "Test Failed."
316 def _process_lat(in_str_1, in_str_2):
317 """Extract P50, P90 and P99 latencies or min, avg, max values from
320 :param in_str_1: Latency string for one direction produced by robot
322 :param in_str_2: Latency string for second direction produced by
326 :returns: Processed latency string or None if a problem occurs.
329 in_list_1 = in_str_1.split('/', 3)
330 in_list_2 = in_str_2.split('/', 3)
332 if len(in_list_1) != 4 and len(in_list_2) != 4:
335 in_list_1[3] += "=" * (len(in_list_1[3]) % 4)
337 hdr_lat_1 = hdrh.histogram.HdrHistogram.decode(in_list_1[3])
338 except hdrh.codec.HdrLengthException:
341 in_list_2[3] += "=" * (len(in_list_2[3]) % 4)
343 hdr_lat_2 = hdrh.histogram.HdrHistogram.decode(in_list_2[3])
344 except hdrh.codec.HdrLengthException:
347 if hdr_lat_1 and hdr_lat_2:
349 hdr_lat_1.get_value_at_percentile(50.0),
350 hdr_lat_1.get_value_at_percentile(90.0),
351 hdr_lat_1.get_value_at_percentile(99.0),
352 hdr_lat_2.get_value_at_percentile(50.0),
353 hdr_lat_2.get_value_at_percentile(90.0),
354 hdr_lat_2.get_value_at_percentile(99.0)
360 int(in_list_1[0]), int(in_list_1[1]), int(in_list_1[2]),
361 int(in_list_2[0]), int(in_list_2[1]), int(in_list_2[2])
364 if item in (-1, 4294967295, 0):
370 f"1. {(data['ndr_low'] / 1e6):5.2f} "
371 f"{data['ndr_low_b']:5.2f}"
372 f"\n2. {(data['pdr_low'] / 1e6):5.2f} "
373 f"{data['pdr_low_b']:5.2f}"
376 _process_lat(data['pdr_lat_10_1'], data['pdr_lat_10_2']),
377 _process_lat(data['pdr_lat_50_1'], data['pdr_lat_50_2']),
378 _process_lat(data['pdr_lat_90_1'], data['pdr_lat_90_2'])
381 max_len = len(str(max((max(item) for item in latency))))
382 max_len = 4 if max_len < 4 else max_len
384 for idx, lat in enumerate(latency):
389 f"{lat[0]:{max_len}d} "
390 f"{lat[1]:{max_len}d} "
391 f"{lat[2]:{max_len}d} "
392 f"{lat[3]:{max_len}d} "
393 f"{lat[4]:{max_len}d} "
394 f"{lat[5]:{max_len}d} "
399 except (AttributeError, IndexError, ValueError, KeyError):
400 return "Test Failed."
402 def _get_testbed(self, msg):
403 """Called when extraction of testbed IP is required.
404 The testbed is identified by TG node IP address.
406 :param msg: Message to process.
411 if msg.message.count("Setup of TG node") or \
412 msg.message.count("Setup of node TG host"):
413 reg_tg_ip = re.compile(
414 r'.*TG .* (\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}).*')
416 self._testbed = str(re.search(reg_tg_ip, msg.message).group(1))
417 except (KeyError, ValueError, IndexError, AttributeError):
420 self._data["metadata"]["testbed"] = self._testbed
422 def _get_vpp_version(self, msg):
423 """Called when extraction of VPP version is required.
425 :param msg: Message to process.
430 if msg.message.count("VPP version:") or \
431 msg.message.count("VPP Version:"):
433 re.search(self.REGEX_VERSION_VPP, msg.message).group(2)
435 self._data["metadata"]["version"] = self._version
437 def _get_dpdk_version(self, msg):
438 """Called when extraction of DPDK version is required.
440 :param msg: Message to process.
445 if msg.message.count("DPDK Version:"):
447 self._version = str(re.search(
448 self.REGEX_VERSION_DPDK, msg.message).group(2))
449 self._data["metadata"]["version"] = self._version
453 def _get_papi_history(self, msg):
454 """Called when extraction of PAPI command history is required.
456 :param msg: Message to process.
460 if msg.message.count("PAPI command history:"):
461 self._conf_history_lookup_nr += 1
462 if self._conf_history_lookup_nr == 1:
463 self._data["tests"][self._test_id]["conf-history"] = str()
465 r"\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3} PAPI command history:",
470 self._data["tests"][self._test_id]["conf-history"] += \
471 f"**DUT{str(self._conf_history_lookup_nr)}:** {text}"
473 def _get_show_run(self, msg):
474 """Called when extraction of VPP operational data (output of CLI command
475 Show Runtime) is required.
477 :param msg: Message to process.
482 if not msg.message.count("stats runtime"):
486 if self._sh_run_counter > 1:
489 if "show-run" not in self._data["tests"][self._test_id].keys():
490 self._data["tests"][self._test_id]["show-run"] = dict()
492 groups = re.search(self.REGEX_TC_PAPI_CLI, msg.message)
496 host = groups.group(1)
497 except (AttributeError, IndexError):
500 sock = groups.group(2)
501 except (AttributeError, IndexError):
504 dut = "dut{nr}".format(
505 nr=len(self._data['tests'][self._test_id]['show-run'].keys()) + 1)
507 self._data['tests'][self._test_id]['show-run'][dut] = \
512 "runtime": str(msg.message).replace(' ', '').
513 replace('\n', '').replace("'", '"').
514 replace('b"', '"').replace('"', '"').
519 def _get_telemetry(self, msg):
520 """Called when extraction of VPP telemetry data is required.
522 :param msg: Message to process.
527 if self._telemetry_kw_counter > 1:
529 if not msg.message.count("# TYPE vpp_runtime_calls"):
532 if "telemetry-show-run" not in \
533 self._data["tests"][self._test_id].keys():
534 self._data["tests"][self._test_id]["telemetry-show-run"] = dict()
536 self._telemetry_msg_counter += 1
537 groups = re.search(self.REGEX_SH_RUN_HOST, msg.message)
541 host = groups.group(1)
542 except (AttributeError, IndexError):
545 sock = groups.group(2)
546 except (AttributeError, IndexError):
549 "source_type": "node",
551 "msg_type": "metric",
553 "timestamp": msg.timestamp,
554 "msg": "show_runtime",
559 for line in msg.message.splitlines():
560 if not line.startswith("vpp_runtime_"):
563 params, value, timestamp = line.rsplit(" ", maxsplit=2)
564 cut = params.index("{")
565 name = params[:cut].split("_", maxsplit=2)[-1]
567 "dict" + params[cut:].replace('{', '(').replace('}', ')')
569 labels["graph_node"] = labels.pop("name")
570 runtime["data"].append(
574 "timestamp": timestamp,
578 except (TypeError, ValueError, IndexError):
580 self._data['tests'][self._test_id]['telemetry-show-run']\
581 [f"dut{self._telemetry_msg_counter}"] = copy.copy(
589 def _get_ndrpdr_throughput(self, msg):
590 """Get NDR_LOWER, NDR_UPPER, PDR_LOWER and PDR_UPPER from the test
593 :param msg: The test message to be parsed.
595 :returns: Parsed data as a dict and the status (PASS/FAIL).
596 :rtype: tuple(dict, str)
600 "NDR": {"LOWER": -1.0, "UPPER": -1.0},
601 "PDR": {"LOWER": -1.0, "UPPER": -1.0}
604 groups = re.search(self.REGEX_NDRPDR_RATE, msg)
606 if groups is not None:
608 throughput["NDR"]["LOWER"] = float(groups.group(1))
609 throughput["NDR"]["UPPER"] = float(groups.group(2))
610 throughput["PDR"]["LOWER"] = float(groups.group(3))
611 throughput["PDR"]["UPPER"] = float(groups.group(4))
613 except (IndexError, ValueError):
616 return throughput, status
618 def _get_ndrpdr_throughput_gbps(self, msg):
619 """Get NDR_LOWER, NDR_UPPER, PDR_LOWER and PDR_UPPER in Gbps from the
622 :param msg: The test message to be parsed.
624 :returns: Parsed data as a dict and the status (PASS/FAIL).
625 :rtype: tuple(dict, str)
629 "NDR": {"LOWER": -1.0, "UPPER": -1.0},
630 "PDR": {"LOWER": -1.0, "UPPER": -1.0}
633 groups = re.search(self.REGEX_NDRPDR_GBPS, msg)
635 if groups is not None:
637 gbps["NDR"]["LOWER"] = float(groups.group(1))
638 gbps["NDR"]["UPPER"] = float(groups.group(2))
639 gbps["PDR"]["LOWER"] = float(groups.group(3))
640 gbps["PDR"]["UPPER"] = float(groups.group(4))
642 except (IndexError, ValueError):
647 def _get_plr_throughput(self, msg):
648 """Get PLRsearch lower bound and PLRsearch upper bound from the test
651 :param msg: The test message to be parsed.
653 :returns: Parsed data as a dict and the status (PASS/FAIL).
654 :rtype: tuple(dict, str)
662 groups = re.search(self.REGEX_PLR_RATE, msg)
664 if groups is not None:
666 throughput["LOWER"] = float(groups.group(1))
667 throughput["UPPER"] = float(groups.group(2))
669 except (IndexError, ValueError):
672 return throughput, status
674 def _get_ndrpdr_latency(self, msg):
675 """Get LATENCY from the test message.
677 :param msg: The test message to be parsed.
679 :returns: Parsed data as a dict and the status (PASS/FAIL).
680 :rtype: tuple(dict, str)
690 "direction1": copy.copy(latency_default),
691 "direction2": copy.copy(latency_default)
694 "direction1": copy.copy(latency_default),
695 "direction2": copy.copy(latency_default)
698 "direction1": copy.copy(latency_default),
699 "direction2": copy.copy(latency_default)
702 "direction1": copy.copy(latency_default),
703 "direction2": copy.copy(latency_default)
706 "direction1": copy.copy(latency_default),
707 "direction2": copy.copy(latency_default)
710 "direction1": copy.copy(latency_default),
711 "direction2": copy.copy(latency_default)
715 groups = re.search(self.REGEX_NDRPDR_LAT, msg)
717 groups = re.search(self.REGEX_NDRPDR_LAT_BASE, msg)
719 return latency, "FAIL"
721 def process_latency(in_str):
722 """Return object with parsed latency values.
724 TODO: Define class for the return type.
726 :param in_str: Input string, min/avg/max/hdrh format.
728 :returns: Dict with corresponding keys, except hdrh float values.
730 :throws IndexError: If in_str does not have enough substrings.
731 :throws ValueError: If a substring does not convert to float.
733 in_list = in_str.split('/', 3)
736 "min": float(in_list[0]),
737 "avg": float(in_list[1]),
738 "max": float(in_list[2]),
742 if len(in_list) == 4:
743 rval["hdrh"] = str(in_list[3])
748 latency["NDR"]["direction1"] = process_latency(groups.group(1))
749 latency["NDR"]["direction2"] = process_latency(groups.group(2))
750 latency["PDR"]["direction1"] = process_latency(groups.group(3))
751 latency["PDR"]["direction2"] = process_latency(groups.group(4))
752 if groups.lastindex == 4:
753 return latency, "PASS"
754 except (IndexError, ValueError):
758 latency["PDR90"]["direction1"] = process_latency(groups.group(5))
759 latency["PDR90"]["direction2"] = process_latency(groups.group(6))
760 latency["PDR50"]["direction1"] = process_latency(groups.group(7))
761 latency["PDR50"]["direction2"] = process_latency(groups.group(8))
762 latency["PDR10"]["direction1"] = process_latency(groups.group(9))
763 latency["PDR10"]["direction2"] = process_latency(groups.group(10))
764 latency["LAT0"]["direction1"] = process_latency(groups.group(11))
765 latency["LAT0"]["direction2"] = process_latency(groups.group(12))
766 if groups.lastindex == 12:
767 return latency, "PASS"
768 except (IndexError, ValueError):
771 return latency, "FAIL"
774 def _get_hoststack_data(msg, tags):
775 """Get data from the hoststack test message.
777 :param msg: The test message to be parsed.
778 :param tags: Test tags.
781 :returns: Parsed data as a JSON dict and the status (PASS/FAIL).
782 :rtype: tuple(dict, str)
787 msg = msg.replace("'", '"').replace(" ", "")
788 if "LDPRELOAD" in tags:
792 except JSONDecodeError:
794 elif "VPPECHO" in tags:
796 msg_lst = msg.replace("}{", "} {").split(" ")
798 client=loads(msg_lst[0]),
799 server=loads(msg_lst[1])
802 except (JSONDecodeError, IndexError):
805 return result, status
807 def _get_vsap_data(self, msg, tags):
808 """Get data from the vsap test message.
810 :param msg: The test message to be parsed.
811 :param tags: Test tags.
814 :returns: Parsed data as a JSON dict and the status (PASS/FAIL).
815 :rtype: tuple(dict, str)
820 groups = re.search(self.REGEX_VSAP_MSG_INFO, msg)
821 if groups is not None:
823 result["transfer-rate"] = float(groups.group(1)) * 1e3
824 result["latency"] = float(groups.group(2))
825 result["completed-requests"] = int(groups.group(3))
826 result["failed-requests"] = int(groups.group(4))
827 result["bytes-transferred"] = int(groups.group(5))
829 result["cps"] = float(groups.group(6))
830 elif "TCP_RPS" in tags:
831 result["rps"] = float(groups.group(6))
833 return result, status
835 except (IndexError, ValueError):
838 return result, status
840 def visit_suite(self, suite):
841 """Implements traversing through the suite and its direct children.
843 :param suite: Suite to process.
847 if self.start_suite(suite) is not False:
848 suite.setup.visit(self)
849 suite.suites.visit(self)
850 suite.tests.visit(self)
851 suite.teardown.visit(self)
852 self.end_suite(suite)
854 def start_suite(self, suite):
855 """Called when suite starts.
857 :param suite: Suite to process.
862 parent_name = suite.parent.name
863 except AttributeError:
866 self._data["suites"][suite.longname.lower().replace('"', "'").\
867 replace(" ", "_")] = {
868 "name": suite.name.lower(),
870 "parent": parent_name,
871 "level": len(suite.longname.split("."))
874 def visit_test(self, test):
875 """Implements traversing through the test.
877 :param test: Test to process.
881 if self.start_test(test) is not False:
882 test.setup.visit(self)
883 test.body.visit(self)
884 test.teardown.visit(self)
887 def start_test(self, test):
888 """Called when test starts.
890 :param test: Test to process.
895 self._sh_run_counter = 0
896 self._telemetry_kw_counter = 0
897 self._telemetry_msg_counter = 0
899 longname_orig = test.longname.lower()
901 # Check the ignore list
902 if longname_orig in self._ignore:
905 tags = [str(tag) for tag in test.tags]
908 # Change the TC long name and name if defined in the mapping table
909 longname = self._mapping.get(longname_orig, None)
910 if longname is not None:
911 name = longname.split('.')[-1]
913 longname = longname_orig
914 name = test.name.lower()
916 # Remove TC number from the TC long name (backward compatibility):
917 self._test_id = re.sub(self.REGEX_TC_NUMBER, "", longname)
918 # Remove TC number from the TC name (not needed):
919 test_result["name"] = re.sub(self.REGEX_TC_NUMBER, "", name)
921 test_result["parent"] = test.parent.name.lower()
922 test_result["tags"] = tags
923 test_result["doc"] = test.doc
924 test_result["type"] = ""
925 test_result["status"] = test.status
926 test_result["starttime"] = test.starttime
927 test_result["endtime"] = test.endtime
929 if test.status == "PASS":
931 if "TCP_PPS" in tags or "UDP_PPS" in tags:
932 test_result["msg"] = self._get_data_from_pps_test_msg(
934 elif "TCP_CPS" in tags or "UDP_CPS" in tags:
935 test_result["msg"] = self._get_data_from_cps_test_msg(
938 test_result["msg"] = self._get_data_from_perf_test_msg(
940 elif "MRR" in tags or "FRMOBL" in tags or "BMRR" in tags:
941 test_result["msg"] = self._get_data_from_mrr_test_msg(
944 test_result["msg"] = test.message
946 test_result["msg"] = test.message
948 if "PERFTEST" in tags and "TREX" not in tags:
949 # Replace info about cores (e.g. -1c-) with the info about threads
950 # and cores (e.g. -1t1c-) in the long test case names and in the
951 # test case names if necessary.
954 for tag in test_result["tags"]:
955 groups = re.search(self.REGEX_TC_TAG, tag)
961 self._test_id = re.sub(
962 self.REGEX_TC_NAME_NEW, f"-{tag_tc.lower()}-",
963 self._test_id, count=1
965 test_result["name"] = re.sub(
966 self.REGEX_TC_NAME_NEW, f"-{tag_tc.lower()}-",
967 test_result["name"], count=1
970 test_result["status"] = "FAIL"
971 self._data["tests"][self._test_id] = test_result
973 f"The test {self._test_id} has no or more than one "
974 f"multi-threading tags.\n"
975 f"Tags: {test_result['tags']}"
979 if "DEVICETEST" in tags:
980 test_result["type"] = "DEVICETEST"
981 elif "NDRPDR" in tags:
982 if "TCP_CPS" in tags or "UDP_CPS" in tags:
983 test_result["type"] = "CPS"
985 test_result["type"] = "NDRPDR"
986 if test.status == "PASS":
987 test_result["throughput"], test_result["status"] = \
988 self._get_ndrpdr_throughput(test.message)
989 test_result["gbps"], test_result["status"] = \
990 self._get_ndrpdr_throughput_gbps(test.message)
991 test_result["latency"], test_result["status"] = \
992 self._get_ndrpdr_latency(test.message)
993 elif "MRR" in tags or "FRMOBL" in tags or "BMRR" in tags:
995 test_result["type"] = "MRR"
997 test_result["type"] = "BMRR"
998 if test.status == "PASS":
999 test_result["result"] = dict()
1000 groups = re.search(self.REGEX_BMRR, test.message)
1001 if groups is not None:
1002 items_str = groups.group(1)
1004 float(item.strip().replace("'", ""))
1005 for item in items_str.split(",")
1007 # Use whole list in CSIT-1180.
1008 stats = jumpavg.AvgStdevStats.for_runs(items_float)
1009 test_result["result"]["samples"] = items_float
1010 test_result["result"]["receive-rate"] = stats.avg
1011 test_result["result"]["receive-stdev"] = stats.stdev
1013 groups = re.search(self.REGEX_MRR, test.message)
1014 test_result["result"]["receive-rate"] = \
1015 float(groups.group(3)) / float(groups.group(1))
1016 elif "SOAK" in tags:
1017 test_result["type"] = "SOAK"
1018 if test.status == "PASS":
1019 test_result["throughput"], test_result["status"] = \
1020 self._get_plr_throughput(test.message)
1021 elif "LDP_NGINX" in tags:
1022 test_result["type"] = "LDP_NGINX"
1023 test_result["result"], test_result["status"] = \
1024 self._get_vsap_data(test.message, tags)
1025 elif "HOSTSTACK" in tags:
1026 test_result["type"] = "HOSTSTACK"
1027 if test.status == "PASS":
1028 test_result["result"], test_result["status"] = \
1029 self._get_hoststack_data(test.message, tags)
1030 elif "RECONF" in tags:
1031 test_result["type"] = "RECONF"
1032 if test.status == "PASS":
1033 test_result["result"] = None
1035 grps_loss = re.search(self.REGEX_RECONF_LOSS, test.message)
1036 grps_time = re.search(self.REGEX_RECONF_TIME, test.message)
1037 test_result["result"] = {
1038 "loss": int(grps_loss.group(1)),
1039 "time": float(grps_time.group(1))
1041 except (AttributeError, IndexError, ValueError, TypeError):
1042 test_result["status"] = "FAIL"
1044 test_result["status"] = "FAIL"
1046 self._data["tests"][self._test_id] = test_result
1048 def visit_keyword(self, kw):
1049 """Implements traversing through the keyword and its child keywords.
1051 :param keyword: Keyword to process.
1052 :type keyword: Keyword
1055 if self.start_keyword(kw) is not False:
1056 if hasattr(kw, "body"):
1058 kw.teardown.visit(self)
1059 self.end_keyword(kw)
1061 def start_keyword(self, keyword):
1062 """Called when keyword starts. Default implementation does nothing.
1064 :param keyword: Keyword to process.
1065 :type keyword: Keyword
1068 self._kw_name = keyword.name
1070 def end_keyword(self, keyword):
1071 """Called when keyword ends. Default implementation does nothing.
1073 :param keyword: Keyword to process.
1074 :type keyword: Keyword
1078 self._kw_name = None
1080 def visit_message(self, msg):
1081 """Implements visiting the message.
1083 :param msg: Message to process.
1087 if self.start_message(msg) is not False:
1088 self.end_message(msg)
1090 def start_message(self, msg):
1091 """Called when message starts. Get required information from messages:
1094 :param msg: Message to process.
1098 if self._kw_name is None:
1100 elif self._kw_name.count("Run Telemetry On All Duts"):
1101 if self._process_oper:
1102 self._telemetry_kw_counter += 1
1103 self._get_telemetry(msg)
1104 elif self._kw_name.count("Show Runtime On All Duts"):
1105 if self._process_oper:
1106 self._sh_run_counter += 1
1107 self._get_show_run(msg)
1108 elif self._kw_name.count("Show Vpp Version On All Duts"):
1109 if not self._version:
1110 self._get_vpp_version(msg)
1111 elif self._kw_name.count("Install Dpdk Framework On All Duts"):
1112 if not self._version:
1113 self._get_dpdk_version(msg)
1114 elif self._kw_name.count("Setup Framework"):
1115 if not self._testbed:
1116 self._get_testbed(msg)
1117 elif self._kw_name.count("Show Papi History On All Duts"):
1118 self._conf_history_lookup_nr = 0
1119 self._get_papi_history(msg)
1125 The data is extracted from output.xml files generated by Jenkins jobs and
1126 stored in pandas' DataFrames.
1132 (as described in ExecutionChecker documentation)
1134 (as described in ExecutionChecker documentation)
1136 (as described in ExecutionChecker documentation)
1139 def __init__(self, spec, for_output):
1142 :param spec: Specification.
1143 :param for_output: Output to be generated from downloaded data.
1144 :type spec: Specification
1145 :type for_output: str
1151 self._for_output = for_output
1154 self._input_data = pd.Series(dtype="float64")
1158 """Getter - Input data.
1160 :returns: Input data
1161 :rtype: pandas.Series
1163 return self._input_data
1165 def metadata(self, job, build):
1166 """Getter - metadata
1168 :param job: Job which metadata we want.
1169 :param build: Build which metadata we want.
1173 :rtype: pandas.Series
1175 return self.data[job][build]["metadata"]
1177 def suites(self, job, build):
1180 :param job: Job which suites we want.
1181 :param build: Build which suites we want.
1185 :rtype: pandas.Series
1187 return self.data[job][str(build)]["suites"]
1189 def tests(self, job, build):
1192 :param job: Job which tests we want.
1193 :param build: Build which tests we want.
1197 :rtype: pandas.Series
1199 return self.data[job][build]["tests"]
1201 def _parse_tests(self, job, build):
1202 """Process data from robot output.xml file and return JSON structured
1205 :param job: The name of job which build output data will be processed.
1206 :param build: The build which output data will be processed.
1209 :returns: JSON data structure.
1218 with open(build["file-name"], 'r') as data_file:
1220 result = ExecutionResult(data_file)
1221 except errors.DataError as err:
1223 f"Error occurred while parsing output.xml: {repr(err)}"
1227 process_oper = False
1228 if "-vpp-perf-report-coverage-" in job:
1230 # elif "-vpp-perf-report-iterative-" in job:
1231 # # Exceptions for TBs where we do not have coverage data:
1232 # for item in ("-2n-icx", ):
1234 # process_oper = True
1236 checker = ExecutionChecker(
1237 metadata, self._cfg.mapping, self._cfg.ignore, process_oper
1239 result.visit(checker)
1241 checker.data["metadata"]["tests_total"] = \
1242 result.statistics.total.total
1243 checker.data["metadata"]["tests_passed"] = \
1244 result.statistics.total.passed
1245 checker.data["metadata"]["tests_failed"] = \
1246 result.statistics.total.failed
1247 checker.data["metadata"]["elapsedtime"] = result.suite.elapsedtime
1248 checker.data["metadata"]["generated"] = result.suite.endtime[:14]
1252 def _download_and_parse_build(self, job, build, repeat, pid=10000):
1253 """Download and parse the input data file.
1255 :param pid: PID of the process executing this method.
1256 :param job: Name of the Jenkins job which generated the processed input
1258 :param build: Information about the Jenkins build which generated the
1259 processed input file.
1260 :param repeat: Repeat the download specified number of times if not
1268 logging.info(f"Processing the job/build: {job}: {build['build']}")
1275 success = download_and_unzip_data_file(self._cfg, job, build, pid)
1281 f"It is not possible to download the input data file from the "
1282 f"job {job}, build {build['build']}, or it is damaged. "
1286 logging.info(f" Processing data from build {build['build']}")
1287 data = self._parse_tests(job, build)
1290 f"Input data file from the job {job}, build "
1291 f"{build['build']} is damaged. Skipped."
1297 remove(build["file-name"])
1298 except OSError as err:
1300 f"Cannot remove the file {build['file-name']}: {repr(err)}"
1303 # If the time-period is defined in the specification file, remove all
1304 # files which are outside the time period.
1306 timeperiod = self._cfg.environment.get("time-period", None)
1307 if timeperiod and data:
1309 timeperiod = timedelta(int(timeperiod))
1310 metadata = data.get("metadata", None)
1312 generated = metadata.get("generated", None)
1314 generated = dt.strptime(generated, "%Y%m%d %H:%M")
1315 if (now - generated) > timeperiod:
1316 # Remove the data and the file:
1321 f" The build {job}/{build['build']} is "
1322 f"outdated, will be removed."
1332 def download_and_parse_data(self, repeat=1):
1333 """Download the input data files, parse input data from input files and
1334 store in pandas' Series.
1336 :param repeat: Repeat the download specified number of times if not
1341 logging.info("Downloading and parsing input files ...")
1343 for job, builds in self._cfg.input.items():
1344 for build in builds:
1346 result = self._download_and_parse_build(job, build, repeat)
1349 build_nr = result["build"]["build"]
1352 data = result["data"]
1353 build_data = pd.Series({
1354 "metadata": pd.Series(
1355 list(data["metadata"].values()),
1356 index=list(data["metadata"].keys())
1358 "suites": pd.Series(
1359 list(data["suites"].values()),
1360 index=list(data["suites"].keys())
1363 list(data["tests"].values()),
1364 index=list(data["tests"].keys())
1368 if self._input_data.get(job, None) is None:
1369 self._input_data[job] = pd.Series(dtype="float64")
1370 self._input_data[job][str(build_nr)] = build_data
1371 self._cfg.set_input_file_name(
1372 job, build_nr, result["build"]["file-name"]
1374 self._cfg.set_input_state(job, build_nr, result["state"])
1377 resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000
1378 logging.info(f"Memory allocation: {mem_alloc:.0f}MB")
1380 logging.info("Done.")
1382 msg = f"Successful downloads from the sources:\n"
1383 for source in self._cfg.environment["data-sources"]:
1384 if source["successful-downloads"]:
1386 f"{source['url']}/{source['path']}/"
1387 f"{source['file-name']}: "
1388 f"{source['successful-downloads']}\n"
1392 def process_local_file(self, local_file, job="local", build_nr=1,
1394 """Process local XML file given as a command-line parameter.
1396 :param local_file: The file to process.
1397 :param job: Job name.
1398 :param build_nr: Build number.
1399 :param replace: If True, the information about jobs and builds is
1400 replaced by the new one, otherwise the new jobs and builds are
1402 :type local_file: str
1406 :raises: PresentationError if an error occurs.
1408 if not isfile(local_file):
1409 raise PresentationError(f"The file {local_file} does not exist.")
1412 build_nr = int(local_file.split("/")[-1].split(".")[0])
1413 except (IndexError, ValueError):
1419 "file-name": local_file
1422 self._cfg.input = dict()
1423 self._cfg.add_build(job, build)
1425 logging.info(f"Processing {job}: {build_nr:2d}: {local_file}")
1426 data = self._parse_tests(job, build)
1428 raise PresentationError(
1429 f"Error occurred while parsing the file {local_file}"
1432 build_data = pd.Series({
1433 "metadata": pd.Series(
1434 list(data["metadata"].values()),
1435 index=list(data["metadata"].keys())
1437 "suites": pd.Series(
1438 list(data["suites"].values()),
1439 index=list(data["suites"].keys())
1442 list(data["tests"].values()),
1443 index=list(data["tests"].keys())
1447 if self._input_data.get(job, None) is None:
1448 self._input_data[job] = pd.Series(dtype="float64")
1449 self._input_data[job][str(build_nr)] = build_data
1451 self._cfg.set_input_state(job, build_nr, "processed")
1453 def process_local_directory(self, local_dir, replace=True):
1454 """Process local directory with XML file(s). The directory is processed
1455 as a 'job' and the XML files in it as builds.
1456 If the given directory contains only sub-directories, these
1457 sub-directories processed as jobs and corresponding XML files as builds
1460 :param local_dir: Local directory to process.
1461 :param replace: If True, the information about jobs and builds is
1462 replaced by the new one, otherwise the new jobs and builds are
1464 :type local_dir: str
1467 if not isdir(local_dir):
1468 raise PresentationError(
1469 f"The directory {local_dir} does not exist."
1472 # Check if the given directory includes only files, or only directories
1473 _, dirnames, filenames = next(walk(local_dir))
1475 if filenames and not dirnames:
1478 # key: dir (job) name, value: list of file names (builds)
1480 local_dir: [join(local_dir, name) for name in filenames]
1483 elif dirnames and not filenames:
1486 # key: dir (job) name, value: list of file names (builds)
1487 local_builds = dict()
1488 for dirname in dirnames:
1490 join(local_dir, dirname, name)
1491 for name in listdir(join(local_dir, dirname))
1492 if isfile(join(local_dir, dirname, name))
1495 local_builds[dirname] = sorted(builds)
1497 elif not filenames and not dirnames:
1498 raise PresentationError(f"The directory {local_dir} is empty.")
1500 raise PresentationError(
1501 f"The directory {local_dir} can include only files or only "
1502 f"directories, not both.\nThe directory {local_dir} includes "
1503 f"file(s):\n{filenames}\nand directories:\n{dirnames}"
1507 self._cfg.input = dict()
1509 for job, files in local_builds.items():
1510 for idx, local_file in enumerate(files):
1511 self.process_local_file(local_file, job, idx + 1, replace=False)
1514 def _end_of_tag(tag_filter, start=0, closer="'"):
1515 """Return the index of character in the string which is the end of tag.
1517 :param tag_filter: The string where the end of tag is being searched.
1518 :param start: The index where the searching is stated.
1519 :param closer: The character which is the tag closer.
1520 :type tag_filter: str
1523 :returns: The index of the tag closer.
1527 idx_opener = tag_filter.index(closer, start)
1528 return tag_filter.index(closer, idx_opener + 1)
1533 def _condition(tag_filter):
1534 """Create a conditional statement from the given tag filter.
1536 :param tag_filter: Filter based on tags from the element specification.
1537 :type tag_filter: str
1538 :returns: Conditional statement which can be evaluated.
1543 index = InputData._end_of_tag(tag_filter, index)
1547 tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
1549 def filter_data(self, element, params=None, data=None, data_set="tests",
1550 continue_on_error=False):
1551 """Filter required data from the given jobs and builds.
1553 The output data structure is:
1556 - test (or suite) 1 ID:
1562 - test (or suite) n ID:
1569 :param element: Element which will use the filtered data.
1570 :param params: Parameters which will be included in the output. If None,
1571 all parameters are included.
1572 :param data: If not None, this data is used instead of data specified
1574 :param data_set: The set of data to be filtered: tests, suites,
1576 :param continue_on_error: Continue if there is error while reading the
1577 data. The Item will be empty then
1578 :type element: pandas.Series
1582 :type continue_on_error: bool
1583 :returns: Filtered data.
1584 :rtype pandas.Series
1588 if data_set == "suites":
1590 elif element["filter"] in ("all", "template"):
1593 cond = InputData._condition(element["filter"])
1594 logging.debug(f" Filter: {cond}")
1596 logging.error(" No filter defined.")
1600 params = element.get("parameters", None)
1602 params.extend(("type", "status"))
1604 data_to_filter = data if data else element["data"]
1605 data = pd.Series(dtype="float64")
1607 for job, builds in data_to_filter.items():
1608 data[job] = pd.Series(dtype="float64")
1609 for build in builds:
1610 data[job][str(build)] = pd.Series(dtype="float64")
1613 self.data[job][str(build)][data_set].items())
1615 if continue_on_error:
1619 for test_id, test_data in data_dict.items():
1620 if eval(cond, {"tags": test_data.get("tags", "")}):
1621 data[job][str(build)][test_id] = \
1622 pd.Series(dtype="float64")
1624 for param, val in test_data.items():
1625 data[job][str(build)][test_id][param] = val
1627 for param in params:
1629 data[job][str(build)][test_id][param] =\
1632 data[job][str(build)][test_id][param] =\
1636 except (KeyError, IndexError, ValueError) as err:
1638 f"Missing mandatory parameter in the element specification: "
1642 except AttributeError as err:
1643 logging.error(repr(err))
1645 except SyntaxError as err:
1647 f"The filter {cond} is not correct. Check if all tags are "
1648 f"enclosed by apostrophes.\n{repr(err)}"
1652 def filter_tests_by_name(self, element, params=None, data_set="tests",
1653 continue_on_error=False):
1654 """Filter required data from the given jobs and builds.
1656 The output data structure is:
1659 - test (or suite) 1 ID:
1665 - test (or suite) n ID:
1672 :param element: Element which will use the filtered data.
1673 :param params: Parameters which will be included in the output. If None,
1674 all parameters are included.
1675 :param data_set: The set of data to be filtered: tests, suites,
1677 :param continue_on_error: Continue if there is error while reading the
1678 data. The Item will be empty then
1679 :type element: pandas.Series
1682 :type continue_on_error: bool
1683 :returns: Filtered data.
1684 :rtype pandas.Series
1687 include = element.get("include", None)
1689 logging.warning("No tests to include, skipping the element.")
1693 params = element.get("parameters", None)
1694 if params and "type" not in params:
1695 params.append("type")
1697 cores = element.get("core", None)
1701 for test in include:
1702 tests.append(test.format(core=core))
1706 data = pd.Series(dtype="float64")
1708 for job, builds in element["data"].items():
1709 data[job] = pd.Series(dtype="float64")
1710 for build in builds:
1711 data[job][str(build)] = pd.Series(dtype="float64")
1714 reg_ex = re.compile(str(test).lower())
1715 for test_id in self.data[job][
1716 str(build)][data_set].keys():
1717 if re.match(reg_ex, str(test_id).lower()):
1718 test_data = self.data[job][
1719 str(build)][data_set][test_id]
1720 data[job][str(build)][test_id] = \
1721 pd.Series(dtype="float64")
1723 for param, val in test_data.items():
1724 data[job][str(build)][test_id]\
1727 for param in params:
1729 data[job][str(build)][
1733 data[job][str(build)][
1734 test_id][param] = "No Data"
1735 except KeyError as err:
1736 if continue_on_error:
1737 logging.debug(repr(err))
1739 logging.error(repr(err))
1743 except (KeyError, IndexError, ValueError) as err:
1745 f"Missing mandatory parameter in the element "
1746 f"specification: {repr(err)}"
1749 except AttributeError as err:
1750 logging.error(repr(err))
1754 def merge_data(data):
1755 """Merge data from more jobs and builds to a simple data structure.
1757 The output data structure is:
1759 - test (suite) 1 ID:
1765 - test (suite) n ID:
1768 :param data: Data to merge.
1769 :type data: pandas.Series
1770 :returns: Merged data.
1771 :rtype: pandas.Series
1774 logging.info(" Merging data ...")
1776 merged_data = pd.Series(dtype="float64")
1777 for builds in data.values:
1778 for item in builds.values:
1779 for item_id, item_data in item.items():
1780 merged_data[item_id] = item_data
1783 def print_all_oper_data(self):
1784 """Print all operational data to console.
1787 for job in self._input_data.values:
1788 for build in job.values:
1789 for test_id, test_data in build["tests"].items():
1791 if test_data.get("show-run", None) is None:
1793 for dut_name, data in test_data["show-run"].items():
1794 if data.get("runtime", None) is None:
1796 runtime = loads(data["runtime"])
1798 threads_nr = len(runtime[0]["clocks"])
1799 except (IndexError, KeyError):
1801 threads = OrderedDict(
1802 {idx: list() for idx in range(threads_nr)})
1803 for item in runtime:
1804 for idx in range(threads_nr):
1805 if item["vectors"][idx] > 0:
1806 clocks = item["clocks"][idx] / \
1807 item["vectors"][idx]
1808 elif item["calls"][idx] > 0:
1809 clocks = item["clocks"][idx] / \
1811 elif item["suspends"][idx] > 0:
1812 clocks = item["clocks"][idx] / \
1813 item["suspends"][idx]
1817 if item["calls"][idx] > 0:
1818 vectors_call = item["vectors"][idx] / \
1823 if int(item["calls"][idx]) + int(
1824 item["vectors"][idx]) + \
1825 int(item["suspends"][idx]):
1826 threads[idx].append([
1829 item["vectors"][idx],
1830 item["suspends"][idx],
1835 print(f"Host IP: {data.get('host', '')}, "
1836 f"Socket: {data.get('socket', '')}")
1837 for thread_nr, thread in threads.items():
1838 txt_table = prettytable.PrettyTable(
1844 "Cycles per Packet",
1845 "Average Vector Size"
1850 txt_table.add_row(row)
1852 if len(thread) == 0:
1855 avg = f", Average Vector Size per Node: " \
1856 f"{(avg / len(thread)):.2f}"
1857 th_name = "main" if thread_nr == 0 \
1858 else f"worker_{thread_nr}"
1859 print(f"{dut_name}, {th_name}{avg}")
1860 txt_table.float_format = ".2"
1861 txt_table.align = "r"
1862 txt_table.align["Name"] = "l"
1863 print(f"{txt_table.get_string()}\n")