1 # Copyright (c) 2022 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Data pre-processing
16 - extract data from output.xml files generated by Jenkins jobs and store in
18 - provide access to the data.
19 - filter the data using tags,
27 from collections import OrderedDict
28 from os import remove, walk, listdir
29 from os.path import isfile, isdir, join
30 from datetime import datetime as dt
31 from datetime import timedelta
32 from json import loads
33 from json.decoder import JSONDecodeError
40 from robot.api import ExecutionResult, ResultVisitor
41 from robot import errors
43 from resources.libraries.python import jumpavg
44 from input_data_files import download_and_unzip_data_file
45 from pal_errors import PresentationError
48 # Separator used in file names
52 class ExecutionChecker(ResultVisitor):
53 """Class to traverse through the test suite structure.
56 REGEX_PLR_RATE = re.compile(
57 r'PLRsearch lower bound::?\s(\d+.\d+).*\n'
58 r'PLRsearch upper bound::?\s(\d+.\d+)'
60 REGEX_NDRPDR_RATE = re.compile(
61 r'NDR_LOWER:\s(\d+.\d+).*\n.*\n'
62 r'NDR_UPPER:\s(\d+.\d+).*\n'
63 r'PDR_LOWER:\s(\d+.\d+).*\n.*\n'
64 r'PDR_UPPER:\s(\d+.\d+)'
66 REGEX_NDRPDR_GBPS = re.compile(
67 r'NDR_LOWER:.*,\s(\d+.\d+).*\n.*\n'
68 r'NDR_UPPER:.*,\s(\d+.\d+).*\n'
69 r'PDR_LOWER:.*,\s(\d+.\d+).*\n.*\n'
70 r'PDR_UPPER:.*,\s(\d+.\d+)'
72 REGEX_PERF_MSG_INFO = re.compile(
73 r'NDR_LOWER:\s(\d+.\d+)\s.*\s(\d+.\d+)\s.*\n.*\n.*\n'
74 r'PDR_LOWER:\s(\d+.\d+)\s.*\s(\d+.\d+)\s.*\n.*\n.*\n'
75 r'Latency at 90% PDR:.*\[\'(.*)\', \'(.*)\'\].*\n'
76 r'Latency at 50% PDR:.*\[\'(.*)\', \'(.*)\'\].*\n'
77 r'Latency at 10% PDR:.*\[\'(.*)\', \'(.*)\'\].*\n'
79 REGEX_CPS_MSG_INFO = re.compile(
80 r'NDR_LOWER:\s(\d+.\d+)\s.*\s.*\n.*\n.*\n'
81 r'PDR_LOWER:\s(\d+.\d+)\s.*\s.*\n.*\n.*'
83 REGEX_PPS_MSG_INFO = re.compile(
84 r'NDR_LOWER:\s(\d+.\d+)\s.*\s(\d+.\d+)\s.*\n.*\n.*\n'
85 r'PDR_LOWER:\s(\d+.\d+)\s.*\s(\d+.\d+)\s.*\n.*\n.*'
87 REGEX_MRR_MSG_INFO = re.compile(r'.*\[(.*)\]')
89 REGEX_VSAP_MSG_INFO = re.compile(
90 r'Transfer Rate: (\d*.\d*).*\n'
91 r'Latency: (\d*.\d*).*\n'
92 r'Completed requests: (\d*).*\n'
93 r'Failed requests: (\d*).*\n'
94 r'Total data transferred: (\d*).*\n'
95 r'Connection [cr]ps rate:\s*(\d*.\d*)'
98 # Needed for CPS and PPS tests
99 REGEX_NDRPDR_LAT_BASE = re.compile(
100 r'LATENCY.*\[\'(.*)\', \'(.*)\'\]\s\n.*\n.*\n'
101 r'LATENCY.*\[\'(.*)\', \'(.*)\'\]'
103 REGEX_NDRPDR_LAT = re.compile(
104 r'LATENCY.*\[\'(.*)\', \'(.*)\'\]\s\n.*\n.*\n'
105 r'LATENCY.*\[\'(.*)\', \'(.*)\'\]\s\n.*\n'
106 r'Latency.*\[\'(.*)\', \'(.*)\'\]\s\n'
107 r'Latency.*\[\'(.*)\', \'(.*)\'\]\s\n'
108 r'Latency.*\[\'(.*)\', \'(.*)\'\]\s\n'
109 r'Latency.*\[\'(.*)\', \'(.*)\'\]'
112 REGEX_VERSION_VPP = re.compile(
113 r"(VPP Version:\s*|VPP version:\s*)(.*)"
115 REGEX_VERSION_DPDK = re.compile(
116 r"(DPDK version:\s*|DPDK Version:\s*)(.*)"
118 REGEX_TCP = re.compile(
119 r'Total\s(rps|cps|throughput):\s(\d*).*$'
121 REGEX_MRR = re.compile(
122 r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
123 r'tx\s(\d*),\srx\s(\d*)'
125 REGEX_BMRR = re.compile(
126 r'.*trial results.*: \[(.*)\]'
128 REGEX_RECONF_LOSS = re.compile(
129 r'Packets lost due to reconfig: (\d*)'
131 REGEX_RECONF_TIME = re.compile(
132 r'Implied time lost: (\d*.[\de-]*)'
134 REGEX_TC_TAG = re.compile(r'\d+[tT]\d+[cC]')
136 REGEX_TC_NAME_NEW = re.compile(r'-\d+[cC]-')
138 REGEX_TC_NUMBER = re.compile(r'tc\d{2}-')
140 REGEX_TC_PAPI_CLI = re.compile(r'.*\((\d+.\d+.\d+.\d+.) - (.*)\)')
142 REGEX_SH_RUN_HOST = re.compile(
143 r'hostname=\"(\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})\",hook=\"(.*)\"'
146 def __init__(self, metadata, mapping, ignore, process_oper):
149 :param metadata: Key-value pairs to be included in "metadata" part of
151 :param mapping: Mapping of the old names of test cases to the new
153 :param ignore: List of TCs to be ignored.
154 :param process_oper: If True, operational data (show run, telemetry) is
159 :type process_oper: bool
162 # Save the provided metadata
163 for key, val in metadata.items():
164 self._data["metadata"][key] = val
166 # Mapping of TCs long names
167 self._mapping = mapping
170 self._ignore = ignore
172 # Process operational data
173 self._process_oper = process_oper
175 # Name of currently processed keyword
182 self._timestamp = None
184 # Testbed. The testbed is identified by TG node IP address.
187 # Number of PAPI History messages found:
189 # 1 - PAPI History of DUT1
190 # 2 - PAPI History of DUT2
191 self._conf_history_lookup_nr = 0
193 self._sh_run_counter = 0
194 self._telemetry_kw_counter = 0
195 self._telemetry_msg_counter = 0
197 # Test ID of currently processed test- the lowercase full path to the
201 # The main data structure
210 """Getter - Data parsed from the XML file.
212 :returns: Data parsed from the XML file.
217 def _get_data_from_mrr_test_msg(self, msg):
218 """Get info from message of MRR performance tests.
220 :param msg: Message to be processed.
222 :returns: Processed message or original message if a problem occurs.
226 groups = re.search(self.REGEX_MRR_MSG_INFO, msg)
227 if not groups or groups.lastindex != 1:
228 return "Test Failed."
231 data = groups.group(1).split(", ")
232 except (AttributeError, IndexError, ValueError, KeyError):
233 return "Test Failed."
238 out_str += f"{(float(item) / 1e6):.2f}, "
239 return out_str[:-2] + "]"
240 except (AttributeError, IndexError, ValueError, KeyError):
241 return "Test Failed."
243 def _get_data_from_cps_test_msg(self, msg):
244 """Get info from message of NDRPDR CPS tests.
246 :param msg: Message to be processed.
248 :returns: Processed message or "Test Failed." if a problem occurs.
252 groups = re.search(self.REGEX_CPS_MSG_INFO, msg)
253 if not groups or groups.lastindex != 2:
254 return "Test Failed."
258 f"1. {(float(groups.group(1)) / 1e6):5.2f}\n"
259 f"2. {(float(groups.group(2)) / 1e6):5.2f}"
261 except (AttributeError, IndexError, ValueError, KeyError):
262 return "Test Failed."
264 def _get_data_from_pps_test_msg(self, msg):
265 """Get info from message of NDRPDR PPS tests.
267 :param msg: Message to be processed.
269 :returns: Processed message or "Test Failed." if a problem occurs.
273 groups = re.search(self.REGEX_PPS_MSG_INFO, msg)
274 if not groups or groups.lastindex != 4:
275 return "Test Failed."
279 f"1. {(float(groups.group(1)) / 1e6):5.2f} "
280 f"{float(groups.group(2)):5.2f}\n"
281 f"2. {(float(groups.group(3)) / 1e6):5.2f} "
282 f"{float(groups.group(4)):5.2f}"
284 except (AttributeError, IndexError, ValueError, KeyError):
285 return "Test Failed."
287 def _get_data_from_perf_test_msg(self, msg):
288 """Get info from message of NDRPDR performance tests.
290 :param msg: Message to be processed.
292 :returns: Processed message or "Test Failed." if a problem occurs.
296 groups = re.search(self.REGEX_PERF_MSG_INFO, msg)
297 if not groups or groups.lastindex != 10:
298 return "Test Failed."
302 "ndr_low": float(groups.group(1)),
303 "ndr_low_b": float(groups.group(2)),
304 "pdr_low": float(groups.group(3)),
305 "pdr_low_b": float(groups.group(4)),
306 "pdr_lat_90_1": groups.group(5),
307 "pdr_lat_90_2": groups.group(6),
308 "pdr_lat_50_1": groups.group(7),
309 "pdr_lat_50_2": groups.group(8),
310 "pdr_lat_10_1": groups.group(9),
311 "pdr_lat_10_2": groups.group(10),
313 except (AttributeError, IndexError, ValueError, KeyError):
314 return "Test Failed."
316 def _process_lat(in_str_1, in_str_2):
317 """Extract P50, P90 and P99 latencies or min, avg, max values from
320 :param in_str_1: Latency string for one direction produced by robot
322 :param in_str_2: Latency string for second direction produced by
326 :returns: Processed latency string or None if a problem occurs.
329 in_list_1 = in_str_1.split('/', 3)
330 in_list_2 = in_str_2.split('/', 3)
332 if len(in_list_1) != 4 and len(in_list_2) != 4:
335 in_list_1[3] += "=" * (len(in_list_1[3]) % 4)
337 hdr_lat_1 = hdrh.histogram.HdrHistogram.decode(in_list_1[3])
338 except hdrh.codec.HdrLengthException:
341 in_list_2[3] += "=" * (len(in_list_2[3]) % 4)
343 hdr_lat_2 = hdrh.histogram.HdrHistogram.decode(in_list_2[3])
344 except hdrh.codec.HdrLengthException:
347 if hdr_lat_1 and hdr_lat_2:
349 hdr_lat_1.get_value_at_percentile(50.0),
350 hdr_lat_1.get_value_at_percentile(90.0),
351 hdr_lat_1.get_value_at_percentile(99.0),
352 hdr_lat_2.get_value_at_percentile(50.0),
353 hdr_lat_2.get_value_at_percentile(90.0),
354 hdr_lat_2.get_value_at_percentile(99.0)
360 int(in_list_1[0]), int(in_list_1[1]), int(in_list_1[2]),
361 int(in_list_2[0]), int(in_list_2[1]), int(in_list_2[2])
364 if item in (-1, 4294967295, 0):
370 f"1. {(data['ndr_low'] / 1e6):5.2f} "
371 f"{data['ndr_low_b']:5.2f}"
372 f"\n2. {(data['pdr_low'] / 1e6):5.2f} "
373 f"{data['pdr_low_b']:5.2f}"
376 _process_lat(data['pdr_lat_10_1'], data['pdr_lat_10_2']),
377 _process_lat(data['pdr_lat_50_1'], data['pdr_lat_50_2']),
378 _process_lat(data['pdr_lat_90_1'], data['pdr_lat_90_2'])
381 max_len = len(str(max((max(item) for item in latency))))
382 max_len = 4 if max_len < 4 else max_len
384 for idx, lat in enumerate(latency):
389 f"{lat[0]:{max_len}d} "
390 f"{lat[1]:{max_len}d} "
391 f"{lat[2]:{max_len}d} "
392 f"{lat[3]:{max_len}d} "
393 f"{lat[4]:{max_len}d} "
394 f"{lat[5]:{max_len}d} "
399 except (AttributeError, IndexError, ValueError, KeyError):
400 return "Test Failed."
402 def _get_testbed(self, msg):
403 """Called when extraction of testbed IP is required.
404 The testbed is identified by TG node IP address.
406 :param msg: Message to process.
411 if msg.message.count("Setup of TG node") or \
412 msg.message.count("Setup of node TG host"):
413 reg_tg_ip = re.compile(
414 r'.*TG .* (\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}).*')
416 self._testbed = str(re.search(reg_tg_ip, msg.message).group(1))
417 except (KeyError, ValueError, IndexError, AttributeError):
420 self._data["metadata"]["testbed"] = self._testbed
422 def _get_vpp_version(self, msg):
423 """Called when extraction of VPP version is required.
425 :param msg: Message to process.
430 if msg.message.count("VPP version:") or \
431 msg.message.count("VPP Version:"):
433 re.search(self.REGEX_VERSION_VPP, msg.message).group(2)
435 self._data["metadata"]["version"] = self._version
437 def _get_dpdk_version(self, msg):
438 """Called when extraction of DPDK version is required.
440 :param msg: Message to process.
445 if msg.message.count("DPDK Version:"):
447 self._version = str(re.search(
448 self.REGEX_VERSION_DPDK, msg.message).group(2))
449 self._data["metadata"]["version"] = self._version
453 def _get_papi_history(self, msg):
454 """Called when extraction of PAPI command history is required.
456 :param msg: Message to process.
460 if msg.message.count("PAPI command history:"):
461 self._conf_history_lookup_nr += 1
462 if self._conf_history_lookup_nr == 1:
463 self._data["tests"][self._test_id]["conf-history"] = str()
465 r"\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3} PAPI command history:",
470 self._data["tests"][self._test_id]["conf-history"] += \
471 f"**DUT{str(self._conf_history_lookup_nr)}:** {text}"
473 def _get_show_run(self, msg):
474 """Called when extraction of VPP operational data (output of CLI command
475 Show Runtime) is required.
477 :param msg: Message to process.
482 if not msg.message.count("stats runtime"):
486 if self._sh_run_counter > 1:
489 if "show-run" not in self._data["tests"][self._test_id].keys():
490 self._data["tests"][self._test_id]["show-run"] = dict()
492 groups = re.search(self.REGEX_TC_PAPI_CLI, msg.message)
496 host = groups.group(1)
497 except (AttributeError, IndexError):
500 sock = groups.group(2)
501 except (AttributeError, IndexError):
504 dut = "dut{nr}".format(
505 nr=len(self._data['tests'][self._test_id]['show-run'].keys()) + 1)
507 self._data['tests'][self._test_id]['show-run'][dut] = \
512 "runtime": str(msg.message).replace(' ', '').
513 replace('\n', '').replace("'", '"').
514 replace('b"', '"').replace('"', '"').
519 def _get_telemetry(self, msg):
520 """Called when extraction of VPP telemetry data is required.
522 :param msg: Message to process.
527 if self._telemetry_kw_counter > 1:
529 if not msg.message.count("# TYPE vpp_runtime_calls"):
532 if "telemetry-show-run" not in \
533 self._data["tests"][self._test_id].keys():
534 self._data["tests"][self._test_id]["telemetry-show-run"] = dict()
536 self._telemetry_msg_counter += 1
537 groups = re.search(self.REGEX_SH_RUN_HOST, msg.message)
541 host = groups.group(1)
542 except (AttributeError, IndexError):
545 sock = groups.group(2)
546 except (AttributeError, IndexError):
549 "source_type": "node",
551 "msg_type": "metric",
553 "timestamp": msg.timestamp,
554 "msg": "show_runtime",
559 for line in msg.message.splitlines():
560 if not line.startswith("vpp_runtime_"):
563 params, value, timestamp = line.rsplit(" ", maxsplit=2)
564 cut = params.index("{")
565 name = params[:cut].split("_", maxsplit=2)[-1]
567 "dict" + params[cut:].replace('{', '(').replace('}', ')')
569 labels["graph_node"] = labels.pop("name")
570 runtime["data"].append(
574 "timestamp": timestamp,
578 except (TypeError, ValueError, IndexError):
580 self._data['tests'][self._test_id]['telemetry-show-run']\
581 [f"dut{self._telemetry_msg_counter}"] = copy.copy(
589 def _get_ndrpdr_throughput(self, msg):
590 """Get NDR_LOWER, NDR_UPPER, PDR_LOWER and PDR_UPPER from the test
593 :param msg: The test message to be parsed.
595 :returns: Parsed data as a dict and the status (PASS/FAIL).
596 :rtype: tuple(dict, str)
600 "NDR": {"LOWER": -1.0, "UPPER": -1.0},
601 "PDR": {"LOWER": -1.0, "UPPER": -1.0}
604 groups = re.search(self.REGEX_NDRPDR_RATE, msg)
606 if groups is not None:
608 throughput["NDR"]["LOWER"] = float(groups.group(1))
609 throughput["NDR"]["UPPER"] = float(groups.group(2))
610 throughput["PDR"]["LOWER"] = float(groups.group(3))
611 throughput["PDR"]["UPPER"] = float(groups.group(4))
613 except (IndexError, ValueError):
616 return throughput, status
618 def _get_ndrpdr_throughput_gbps(self, msg):
619 """Get NDR_LOWER, NDR_UPPER, PDR_LOWER and PDR_UPPER in Gbps from the
622 :param msg: The test message to be parsed.
624 :returns: Parsed data as a dict and the status (PASS/FAIL).
625 :rtype: tuple(dict, str)
629 "NDR": {"LOWER": -1.0, "UPPER": -1.0},
630 "PDR": {"LOWER": -1.0, "UPPER": -1.0}
633 groups = re.search(self.REGEX_NDRPDR_GBPS, msg)
635 if groups is not None:
637 gbps["NDR"]["LOWER"] = float(groups.group(1))
638 gbps["NDR"]["UPPER"] = float(groups.group(2))
639 gbps["PDR"]["LOWER"] = float(groups.group(3))
640 gbps["PDR"]["UPPER"] = float(groups.group(4))
642 except (IndexError, ValueError):
647 def _get_plr_throughput(self, msg):
648 """Get PLRsearch lower bound and PLRsearch upper bound from the test
651 :param msg: The test message to be parsed.
653 :returns: Parsed data as a dict and the status (PASS/FAIL).
654 :rtype: tuple(dict, str)
662 groups = re.search(self.REGEX_PLR_RATE, msg)
664 if groups is not None:
666 throughput["LOWER"] = float(groups.group(1))
667 throughput["UPPER"] = float(groups.group(2))
669 except (IndexError, ValueError):
672 return throughput, status
674 def _get_ndrpdr_latency(self, msg):
675 """Get LATENCY from the test message.
677 :param msg: The test message to be parsed.
679 :returns: Parsed data as a dict and the status (PASS/FAIL).
680 :rtype: tuple(dict, str)
690 "direction1": copy.copy(latency_default),
691 "direction2": copy.copy(latency_default)
694 "direction1": copy.copy(latency_default),
695 "direction2": copy.copy(latency_default)
698 "direction1": copy.copy(latency_default),
699 "direction2": copy.copy(latency_default)
702 "direction1": copy.copy(latency_default),
703 "direction2": copy.copy(latency_default)
706 "direction1": copy.copy(latency_default),
707 "direction2": copy.copy(latency_default)
710 "direction1": copy.copy(latency_default),
711 "direction2": copy.copy(latency_default)
715 groups = re.search(self.REGEX_NDRPDR_LAT, msg)
717 groups = re.search(self.REGEX_NDRPDR_LAT_BASE, msg)
719 return latency, "FAIL"
721 def process_latency(in_str):
722 """Return object with parsed latency values.
724 TODO: Define class for the return type.
726 :param in_str: Input string, min/avg/max/hdrh format.
728 :returns: Dict with corresponding keys, except hdrh float values.
730 :throws IndexError: If in_str does not have enough substrings.
731 :throws ValueError: If a substring does not convert to float.
733 in_list = in_str.split('/', 3)
736 "min": float(in_list[0]),
737 "avg": float(in_list[1]),
738 "max": float(in_list[2]),
742 if len(in_list) == 4:
743 rval["hdrh"] = str(in_list[3])
748 latency["NDR"]["direction1"] = process_latency(groups.group(1))
749 latency["NDR"]["direction2"] = process_latency(groups.group(2))
750 latency["PDR"]["direction1"] = process_latency(groups.group(3))
751 latency["PDR"]["direction2"] = process_latency(groups.group(4))
752 if groups.lastindex == 4:
753 return latency, "PASS"
754 except (IndexError, ValueError):
758 latency["PDR90"]["direction1"] = process_latency(groups.group(5))
759 latency["PDR90"]["direction2"] = process_latency(groups.group(6))
760 latency["PDR50"]["direction1"] = process_latency(groups.group(7))
761 latency["PDR50"]["direction2"] = process_latency(groups.group(8))
762 latency["PDR10"]["direction1"] = process_latency(groups.group(9))
763 latency["PDR10"]["direction2"] = process_latency(groups.group(10))
764 latency["LAT0"]["direction1"] = process_latency(groups.group(11))
765 latency["LAT0"]["direction2"] = process_latency(groups.group(12))
766 if groups.lastindex == 12:
767 return latency, "PASS"
768 except (IndexError, ValueError):
771 return latency, "FAIL"
774 def _get_hoststack_data(msg, tags):
775 """Get data from the hoststack test message.
777 :param msg: The test message to be parsed.
778 :param tags: Test tags.
781 :returns: Parsed data as a JSON dict and the status (PASS/FAIL).
782 :rtype: tuple(dict, str)
787 msg = msg.replace("'", '"').replace(" ", "")
788 if "LDPRELOAD" in tags:
792 except JSONDecodeError:
794 elif "VPPECHO" in tags:
796 msg_lst = msg.replace("}{", "} {").split(" ")
798 client=loads(msg_lst[0]),
799 server=loads(msg_lst[1])
802 except (JSONDecodeError, IndexError):
805 return result, status
807 def _get_vsap_data(self, msg, tags):
808 """Get data from the vsap test message.
810 :param msg: The test message to be parsed.
811 :param tags: Test tags.
814 :returns: Parsed data as a JSON dict and the status (PASS/FAIL).
815 :rtype: tuple(dict, str)
820 groups = re.search(self.REGEX_VSAP_MSG_INFO, msg)
821 if groups is not None:
823 result["transfer-rate"] = float(groups.group(1)) * 1e3
824 result["latency"] = float(groups.group(2))
825 result["completed-requests"] = int(groups.group(3))
826 result["failed-requests"] = int(groups.group(4))
827 result["bytes-transferred"] = int(groups.group(5))
829 result["cps"] = float(groups.group(6))
830 elif "TCP_RPS" in tags:
831 result["rps"] = float(groups.group(6))
833 return result, status
835 except (IndexError, ValueError):
838 return result, status
840 def visit_suite(self, suite):
841 """Implements traversing through the suite and its direct children.
843 :param suite: Suite to process.
847 if self.start_suite(suite) is not False:
848 suite.setup.visit(self)
849 suite.suites.visit(self)
850 suite.tests.visit(self)
851 suite.teardown.visit(self)
852 self.end_suite(suite)
854 def start_suite(self, suite):
855 """Called when suite starts.
857 :param suite: Suite to process.
862 parent_name = suite.parent.name
863 except AttributeError:
866 self._data["suites"][suite.longname.lower().replace('"', "'").\
867 replace(" ", "_")] = {
868 "name": suite.name.lower(),
870 "parent": parent_name,
871 "level": len(suite.longname.split("."))
874 def visit_test(self, test):
875 """Implements traversing through the test.
877 :param test: Test to process.
881 if self.start_test(test) is not False:
882 test.setup.visit(self)
883 test.body.visit(self)
884 test.teardown.visit(self)
887 def start_test(self, test):
888 """Called when test starts.
890 :param test: Test to process.
895 self._sh_run_counter = 0
896 self._telemetry_kw_counter = 0
897 self._telemetry_msg_counter = 0
899 longname_orig = test.longname.lower()
901 # Check the ignore list
902 if longname_orig in self._ignore:
905 tags = [str(tag) for tag in test.tags]
908 # Change the TC long name and name if defined in the mapping table
909 longname = self._mapping.get(longname_orig, None)
910 if longname is not None:
911 name = longname.split('.')[-1]
913 longname = longname_orig
914 name = test.name.lower()
916 # Remove TC number from the TC long name (backward compatibility):
917 self._test_id = re.sub(self.REGEX_TC_NUMBER, "", longname)
918 # Remove TC number from the TC name (not needed):
919 test_result["name"] = re.sub(self.REGEX_TC_NUMBER, "", name)
921 test_result["parent"] = test.parent.name.lower()
922 test_result["tags"] = tags
923 test_result["doc"] = test.doc
924 test_result["type"] = ""
925 test_result["status"] = test.status
926 test_result["starttime"] = test.starttime
927 test_result["endtime"] = test.endtime
929 if test.status == "PASS":
931 if "TCP_PPS" in tags or "UDP_PPS" in tags:
932 test_result["msg"] = self._get_data_from_pps_test_msg(
934 elif "TCP_CPS" in tags or "UDP_CPS" in tags:
935 test_result["msg"] = self._get_data_from_cps_test_msg(
938 test_result["msg"] = self._get_data_from_perf_test_msg(
940 elif "MRR" in tags or "FRMOBL" in tags or "BMRR" in tags:
941 test_result["msg"] = self._get_data_from_mrr_test_msg(
944 test_result["msg"] = test.message
946 test_result["msg"] = test.message
948 if "PERFTEST" in tags and "TREX" not in tags:
949 # Replace info about cores (e.g. -1c-) with the info about threads
950 # and cores (e.g. -1t1c-) in the long test case names and in the
951 # test case names if necessary.
954 for tag in test_result["tags"]:
955 groups = re.search(self.REGEX_TC_TAG, tag)
961 self._test_id = re.sub(
962 self.REGEX_TC_NAME_NEW, f"-{tag_tc.lower()}-",
963 self._test_id, count=1
965 test_result["name"] = re.sub(
966 self.REGEX_TC_NAME_NEW, f"-{tag_tc.lower()}-",
967 test_result["name"], count=1
970 test_result["status"] = "FAIL"
971 self._data["tests"][self._test_id] = test_result
973 f"The test {self._test_id} has no or more than one "
974 f"multi-threading tags.\n"
975 f"Tags: {test_result['tags']}"
979 if "DEVICETEST" in tags:
980 test_result["type"] = "DEVICETEST"
981 elif "NDRPDR" in tags:
982 if "TCP_CPS" in tags or "UDP_CPS" in tags:
983 test_result["type"] = "CPS"
985 test_result["type"] = "NDRPDR"
986 if test.status == "PASS":
987 test_result["throughput"], test_result["status"] = \
988 self._get_ndrpdr_throughput(test.message)
989 test_result["gbps"], test_result["status"] = \
990 self._get_ndrpdr_throughput_gbps(test.message)
991 test_result["latency"], test_result["status"] = \
992 self._get_ndrpdr_latency(test.message)
993 elif "MRR" in tags or "FRMOBL" in tags or "BMRR" in tags:
995 test_result["type"] = "MRR"
997 test_result["type"] = "BMRR"
998 if test.status == "PASS":
999 test_result["result"] = dict()
1000 groups = re.search(self.REGEX_BMRR, test.message)
1001 if groups is not None:
1002 items_str = groups.group(1)
1004 float(item.strip().replace("'", ""))
1005 for item in items_str.split(",")
1007 # Use whole list in CSIT-1180.
1008 stats = jumpavg.AvgStdevStats.for_runs(items_float)
1009 test_result["result"]["samples"] = items_float
1010 test_result["result"]["receive-rate"] = stats.avg
1011 test_result["result"]["receive-stdev"] = stats.stdev
1013 groups = re.search(self.REGEX_MRR, test.message)
1014 test_result["result"]["receive-rate"] = \
1015 float(groups.group(3)) / float(groups.group(1))
1016 elif "SOAK" in tags:
1017 test_result["type"] = "SOAK"
1018 if test.status == "PASS":
1019 test_result["throughput"], test_result["status"] = \
1020 self._get_plr_throughput(test.message)
1021 elif "LDP_NGINX" in tags:
1022 test_result["type"] = "LDP_NGINX"
1023 test_result["result"], test_result["status"] = \
1024 self._get_vsap_data(test.message, tags)
1025 elif "HOSTSTACK" in tags:
1026 test_result["type"] = "HOSTSTACK"
1027 if test.status == "PASS":
1028 test_result["result"], test_result["status"] = \
1029 self._get_hoststack_data(test.message, tags)
1030 elif "RECONF" in tags:
1031 test_result["type"] = "RECONF"
1032 if test.status == "PASS":
1033 test_result["result"] = None
1035 grps_loss = re.search(self.REGEX_RECONF_LOSS, test.message)
1036 grps_time = re.search(self.REGEX_RECONF_TIME, test.message)
1037 test_result["result"] = {
1038 "loss": int(grps_loss.group(1)),
1039 "time": float(grps_time.group(1))
1041 except (AttributeError, IndexError, ValueError, TypeError):
1042 test_result["status"] = "FAIL"
1044 test_result["status"] = "FAIL"
1046 self._data["tests"][self._test_id] = test_result
1048 def visit_keyword(self, kw):
1049 """Implements traversing through the keyword and its child keywords.
1051 :param keyword: Keyword to process.
1052 :type keyword: Keyword
1055 if self.start_keyword(kw) is not False:
1056 if hasattr(kw, "body"):
1058 kw.teardown.visit(self)
1059 self.end_keyword(kw)
1061 def start_keyword(self, keyword):
1062 """Called when keyword starts. Default implementation does nothing.
1064 :param keyword: Keyword to process.
1065 :type keyword: Keyword
1068 self._kw_name = keyword.name
1070 def end_keyword(self, keyword):
1071 """Called when keyword ends. Default implementation does nothing.
1073 :param keyword: Keyword to process.
1074 :type keyword: Keyword
1078 self._kw_name = None
1080 def visit_message(self, msg):
1081 """Implements visiting the message.
1083 :param msg: Message to process.
1087 if self.start_message(msg) is not False:
1088 self.end_message(msg)
1090 def start_message(self, msg):
1091 """Called when message starts. Get required information from messages:
1094 :param msg: Message to process.
1098 if self._kw_name is None:
1100 elif self._kw_name.count("Run Telemetry On All Duts"):
1101 self._telemetry_kw_counter += 1
1102 self._get_telemetry(msg)
1103 elif self._kw_name.count("Show Runtime On All Duts"):
1104 self._sh_run_counter += 1
1105 self._get_show_run(msg)
1106 elif self._kw_name.count("Show Vpp Version On All Duts"):
1107 if not self._version:
1108 self._get_vpp_version(msg)
1109 elif self._kw_name.count("Install Dpdk Framework On All Duts"):
1110 if not self._version:
1111 self._get_dpdk_version(msg)
1112 elif self._kw_name.count("Setup Framework"):
1113 if not self._testbed:
1114 self._get_testbed(msg)
1115 elif self._kw_name.count("Show Papi History On All Duts"):
1116 self._conf_history_lookup_nr = 0
1117 self._get_papi_history(msg)
1123 The data is extracted from output.xml files generated by Jenkins jobs and
1124 stored in pandas' DataFrames.
1130 (as described in ExecutionChecker documentation)
1132 (as described in ExecutionChecker documentation)
1134 (as described in ExecutionChecker documentation)
1137 def __init__(self, spec, for_output):
1140 :param spec: Specification.
1141 :param for_output: Output to be generated from downloaded data.
1142 :type spec: Specification
1143 :type for_output: str
1149 self._for_output = for_output
1152 self._input_data = pd.Series(dtype="float64")
1156 """Getter - Input data.
1158 :returns: Input data
1159 :rtype: pandas.Series
1161 return self._input_data
1163 def metadata(self, job, build):
1164 """Getter - metadata
1166 :param job: Job which metadata we want.
1167 :param build: Build which metadata we want.
1171 :rtype: pandas.Series
1173 return self.data[job][build]["metadata"]
1175 def suites(self, job, build):
1178 :param job: Job which suites we want.
1179 :param build: Build which suites we want.
1183 :rtype: pandas.Series
1185 return self.data[job][str(build)]["suites"]
1187 def tests(self, job, build):
1190 :param job: Job which tests we want.
1191 :param build: Build which tests we want.
1195 :rtype: pandas.Series
1197 return self.data[job][build]["tests"]
1199 def _parse_tests(self, job, build):
1200 """Process data from robot output.xml file and return JSON structured
1203 :param job: The name of job which build output data will be processed.
1204 :param build: The build which output data will be processed.
1207 :returns: JSON data structure.
1216 with open(build["file-name"], 'r') as data_file:
1218 result = ExecutionResult(data_file)
1219 except errors.DataError as err:
1221 f"Error occurred while parsing output.xml: {repr(err)}"
1225 process_oper = False
1226 if "-vpp-perf-report-coverage-" in job:
1228 # elif "-vpp-perf-report-iterative-" in job:
1229 # # Exceptions for TBs where we do not have coverage data:
1230 # for item in ("-2n-icx", ):
1232 # process_oper = True
1234 checker = ExecutionChecker(
1235 metadata, self._cfg.mapping, self._cfg.ignore, process_oper
1237 result.visit(checker)
1239 checker.data["metadata"]["tests_total"] = \
1240 result.statistics.total.total
1241 checker.data["metadata"]["tests_passed"] = \
1242 result.statistics.total.passed
1243 checker.data["metadata"]["tests_failed"] = \
1244 result.statistics.total.failed
1245 checker.data["metadata"]["elapsedtime"] = result.suite.elapsedtime
1246 checker.data["metadata"]["generated"] = result.suite.endtime[:14]
1250 def _download_and_parse_build(self, job, build, repeat, pid=10000):
1251 """Download and parse the input data file.
1253 :param pid: PID of the process executing this method.
1254 :param job: Name of the Jenkins job which generated the processed input
1256 :param build: Information about the Jenkins build which generated the
1257 processed input file.
1258 :param repeat: Repeat the download specified number of times if not
1266 logging.info(f"Processing the job/build: {job}: {build['build']}")
1273 success = download_and_unzip_data_file(self._cfg, job, build, pid)
1279 f"It is not possible to download the input data file from the "
1280 f"job {job}, build {build['build']}, or it is damaged. "
1284 logging.info(f" Processing data from build {build['build']}")
1285 data = self._parse_tests(job, build)
1288 f"Input data file from the job {job}, build "
1289 f"{build['build']} is damaged. Skipped."
1295 remove(build["file-name"])
1296 except OSError as err:
1298 f"Cannot remove the file {build['file-name']}: {repr(err)}"
1301 # If the time-period is defined in the specification file, remove all
1302 # files which are outside the time period.
1304 timeperiod = self._cfg.environment.get("time-period", None)
1305 if timeperiod and data:
1307 timeperiod = timedelta(int(timeperiod))
1308 metadata = data.get("metadata", None)
1310 generated = metadata.get("generated", None)
1312 generated = dt.strptime(generated, "%Y%m%d %H:%M")
1313 if (now - generated) > timeperiod:
1314 # Remove the data and the file:
1319 f" The build {job}/{build['build']} is "
1320 f"outdated, will be removed."
1330 def download_and_parse_data(self, repeat=1):
1331 """Download the input data files, parse input data from input files and
1332 store in pandas' Series.
1334 :param repeat: Repeat the download specified number of times if not
1339 logging.info("Downloading and parsing input files ...")
1341 for job, builds in self._cfg.input.items():
1342 for build in builds:
1344 result = self._download_and_parse_build(job, build, repeat)
1347 build_nr = result["build"]["build"]
1350 data = result["data"]
1351 build_data = pd.Series({
1352 "metadata": pd.Series(
1353 list(data["metadata"].values()),
1354 index=list(data["metadata"].keys())
1356 "suites": pd.Series(
1357 list(data["suites"].values()),
1358 index=list(data["suites"].keys())
1361 list(data["tests"].values()),
1362 index=list(data["tests"].keys())
1366 if self._input_data.get(job, None) is None:
1367 self._input_data[job] = pd.Series(dtype="float64")
1368 self._input_data[job][str(build_nr)] = build_data
1369 self._cfg.set_input_file_name(
1370 job, build_nr, result["build"]["file-name"]
1372 self._cfg.set_input_state(job, build_nr, result["state"])
1375 resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000
1376 logging.info(f"Memory allocation: {mem_alloc:.0f}MB")
1378 logging.info("Done.")
1380 msg = f"Successful downloads from the sources:\n"
1381 for source in self._cfg.environment["data-sources"]:
1382 if source["successful-downloads"]:
1384 f"{source['url']}/{source['path']}/"
1385 f"{source['file-name']}: "
1386 f"{source['successful-downloads']}\n"
1390 def process_local_file(self, local_file, job="local", build_nr=1,
1392 """Process local XML file given as a command-line parameter.
1394 :param local_file: The file to process.
1395 :param job: Job name.
1396 :param build_nr: Build number.
1397 :param replace: If True, the information about jobs and builds is
1398 replaced by the new one, otherwise the new jobs and builds are
1400 :type local_file: str
1404 :raises: PresentationError if an error occurs.
1406 if not isfile(local_file):
1407 raise PresentationError(f"The file {local_file} does not exist.")
1410 build_nr = int(local_file.split("/")[-1].split(".")[0])
1411 except (IndexError, ValueError):
1417 "file-name": local_file
1420 self._cfg.input = dict()
1421 self._cfg.add_build(job, build)
1423 logging.info(f"Processing {job}: {build_nr:2d}: {local_file}")
1424 data = self._parse_tests(job, build)
1426 raise PresentationError(
1427 f"Error occurred while parsing the file {local_file}"
1430 build_data = pd.Series({
1431 "metadata": pd.Series(
1432 list(data["metadata"].values()),
1433 index=list(data["metadata"].keys())
1435 "suites": pd.Series(
1436 list(data["suites"].values()),
1437 index=list(data["suites"].keys())
1440 list(data["tests"].values()),
1441 index=list(data["tests"].keys())
1445 if self._input_data.get(job, None) is None:
1446 self._input_data[job] = pd.Series(dtype="float64")
1447 self._input_data[job][str(build_nr)] = build_data
1449 self._cfg.set_input_state(job, build_nr, "processed")
1451 def process_local_directory(self, local_dir, replace=True):
1452 """Process local directory with XML file(s). The directory is processed
1453 as a 'job' and the XML files in it as builds.
1454 If the given directory contains only sub-directories, these
1455 sub-directories processed as jobs and corresponding XML files as builds
1458 :param local_dir: Local directory to process.
1459 :param replace: If True, the information about jobs and builds is
1460 replaced by the new one, otherwise the new jobs and builds are
1462 :type local_dir: str
1465 if not isdir(local_dir):
1466 raise PresentationError(
1467 f"The directory {local_dir} does not exist."
1470 # Check if the given directory includes only files, or only directories
1471 _, dirnames, filenames = next(walk(local_dir))
1473 if filenames and not dirnames:
1476 # key: dir (job) name, value: list of file names (builds)
1478 local_dir: [join(local_dir, name) for name in filenames]
1481 elif dirnames and not filenames:
1484 # key: dir (job) name, value: list of file names (builds)
1485 local_builds = dict()
1486 for dirname in dirnames:
1488 join(local_dir, dirname, name)
1489 for name in listdir(join(local_dir, dirname))
1490 if isfile(join(local_dir, dirname, name))
1493 local_builds[dirname] = sorted(builds)
1495 elif not filenames and not dirnames:
1496 raise PresentationError(f"The directory {local_dir} is empty.")
1498 raise PresentationError(
1499 f"The directory {local_dir} can include only files or only "
1500 f"directories, not both.\nThe directory {local_dir} includes "
1501 f"file(s):\n{filenames}\nand directories:\n{dirnames}"
1505 self._cfg.input = dict()
1507 for job, files in local_builds.items():
1508 for idx, local_file in enumerate(files):
1509 self.process_local_file(local_file, job, idx + 1, replace=False)
1512 def _end_of_tag(tag_filter, start=0, closer="'"):
1513 """Return the index of character in the string which is the end of tag.
1515 :param tag_filter: The string where the end of tag is being searched.
1516 :param start: The index where the searching is stated.
1517 :param closer: The character which is the tag closer.
1518 :type tag_filter: str
1521 :returns: The index of the tag closer.
1525 idx_opener = tag_filter.index(closer, start)
1526 return tag_filter.index(closer, idx_opener + 1)
1531 def _condition(tag_filter):
1532 """Create a conditional statement from the given tag filter.
1534 :param tag_filter: Filter based on tags from the element specification.
1535 :type tag_filter: str
1536 :returns: Conditional statement which can be evaluated.
1541 index = InputData._end_of_tag(tag_filter, index)
1545 tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
1547 def filter_data(self, element, params=None, data=None, data_set="tests",
1548 continue_on_error=False):
1549 """Filter required data from the given jobs and builds.
1551 The output data structure is:
1554 - test (or suite) 1 ID:
1560 - test (or suite) n ID:
1567 :param element: Element which will use the filtered data.
1568 :param params: Parameters which will be included in the output. If None,
1569 all parameters are included.
1570 :param data: If not None, this data is used instead of data specified
1572 :param data_set: The set of data to be filtered: tests, suites,
1574 :param continue_on_error: Continue if there is error while reading the
1575 data. The Item will be empty then
1576 :type element: pandas.Series
1580 :type continue_on_error: bool
1581 :returns: Filtered data.
1582 :rtype pandas.Series
1586 if data_set == "suites":
1588 elif element["filter"] in ("all", "template"):
1591 cond = InputData._condition(element["filter"])
1592 logging.debug(f" Filter: {cond}")
1594 logging.error(" No filter defined.")
1598 params = element.get("parameters", None)
1600 params.extend(("type", "status"))
1602 data_to_filter = data if data else element["data"]
1603 data = pd.Series(dtype="float64")
1605 for job, builds in data_to_filter.items():
1606 data[job] = pd.Series(dtype="float64")
1607 for build in builds:
1608 data[job][str(build)] = pd.Series(dtype="float64")
1611 self.data[job][str(build)][data_set].items())
1613 if continue_on_error:
1617 for test_id, test_data in data_dict.items():
1618 if eval(cond, {"tags": test_data.get("tags", "")}):
1619 data[job][str(build)][test_id] = \
1620 pd.Series(dtype="float64")
1622 for param, val in test_data.items():
1623 data[job][str(build)][test_id][param] = val
1625 for param in params:
1627 data[job][str(build)][test_id][param] =\
1630 data[job][str(build)][test_id][param] =\
1634 except (KeyError, IndexError, ValueError) as err:
1636 f"Missing mandatory parameter in the element specification: "
1640 except AttributeError as err:
1641 logging.error(repr(err))
1643 except SyntaxError as err:
1645 f"The filter {cond} is not correct. Check if all tags are "
1646 f"enclosed by apostrophes.\n{repr(err)}"
1650 def filter_tests_by_name(self, element, params=None, data_set="tests",
1651 continue_on_error=False):
1652 """Filter required data from the given jobs and builds.
1654 The output data structure is:
1657 - test (or suite) 1 ID:
1663 - test (or suite) n ID:
1670 :param element: Element which will use the filtered data.
1671 :param params: Parameters which will be included in the output. If None,
1672 all parameters are included.
1673 :param data_set: The set of data to be filtered: tests, suites,
1675 :param continue_on_error: Continue if there is error while reading the
1676 data. The Item will be empty then
1677 :type element: pandas.Series
1680 :type continue_on_error: bool
1681 :returns: Filtered data.
1682 :rtype pandas.Series
1685 include = element.get("include", None)
1687 logging.warning("No tests to include, skipping the element.")
1691 params = element.get("parameters", None)
1692 if params and "type" not in params:
1693 params.append("type")
1695 cores = element.get("core", None)
1699 for test in include:
1700 tests.append(test.format(core=core))
1704 data = pd.Series(dtype="float64")
1706 for job, builds in element["data"].items():
1707 data[job] = pd.Series(dtype="float64")
1708 for build in builds:
1709 data[job][str(build)] = pd.Series(dtype="float64")
1712 reg_ex = re.compile(str(test).lower())
1713 for test_id in self.data[job][
1714 str(build)][data_set].keys():
1715 if re.match(reg_ex, str(test_id).lower()):
1716 test_data = self.data[job][
1717 str(build)][data_set][test_id]
1718 data[job][str(build)][test_id] = \
1719 pd.Series(dtype="float64")
1721 for param, val in test_data.items():
1722 data[job][str(build)][test_id]\
1725 for param in params:
1727 data[job][str(build)][
1731 data[job][str(build)][
1732 test_id][param] = "No Data"
1733 except KeyError as err:
1734 if continue_on_error:
1735 logging.debug(repr(err))
1737 logging.error(repr(err))
1741 except (KeyError, IndexError, ValueError) as err:
1743 f"Missing mandatory parameter in the element "
1744 f"specification: {repr(err)}"
1747 except AttributeError as err:
1748 logging.error(repr(err))
1752 def merge_data(data):
1753 """Merge data from more jobs and builds to a simple data structure.
1755 The output data structure is:
1757 - test (suite) 1 ID:
1763 - test (suite) n ID:
1766 :param data: Data to merge.
1767 :type data: pandas.Series
1768 :returns: Merged data.
1769 :rtype: pandas.Series
1772 logging.info(" Merging data ...")
1774 merged_data = pd.Series(dtype="float64")
1775 for builds in data.values:
1776 for item in builds.values:
1777 for item_id, item_data in item.items():
1778 merged_data[item_id] = item_data
1781 def print_all_oper_data(self):
1782 """Print all operational data to console.
1785 for job in self._input_data.values:
1786 for build in job.values:
1787 for test_id, test_data in build["tests"].items():
1789 if test_data.get("show-run", None) is None:
1791 for dut_name, data in test_data["show-run"].items():
1792 if data.get("runtime", None) is None:
1794 runtime = loads(data["runtime"])
1796 threads_nr = len(runtime[0]["clocks"])
1797 except (IndexError, KeyError):
1799 threads = OrderedDict(
1800 {idx: list() for idx in range(threads_nr)})
1801 for item in runtime:
1802 for idx in range(threads_nr):
1803 if item["vectors"][idx] > 0:
1804 clocks = item["clocks"][idx] / \
1805 item["vectors"][idx]
1806 elif item["calls"][idx] > 0:
1807 clocks = item["clocks"][idx] / \
1809 elif item["suspends"][idx] > 0:
1810 clocks = item["clocks"][idx] / \
1811 item["suspends"][idx]
1815 if item["calls"][idx] > 0:
1816 vectors_call = item["vectors"][idx] / \
1821 if int(item["calls"][idx]) + int(
1822 item["vectors"][idx]) + \
1823 int(item["suspends"][idx]):
1824 threads[idx].append([
1827 item["vectors"][idx],
1828 item["suspends"][idx],
1833 print(f"Host IP: {data.get('host', '')}, "
1834 f"Socket: {data.get('socket', '')}")
1835 for thread_nr, thread in threads.items():
1836 txt_table = prettytable.PrettyTable(
1842 "Cycles per Packet",
1843 "Average Vector Size"
1848 txt_table.add_row(row)
1850 if len(thread) == 0:
1853 avg = f", Average Vector Size per Node: " \
1854 f"{(avg / len(thread)):.2f}"
1855 th_name = "main" if thread_nr == 0 \
1856 else f"worker_{thread_nr}"
1857 print(f"{dut_name}, {th_name}{avg}")
1858 txt_table.float_format = ".2"
1859 txt_table.align = "r"
1860 txt_table.align["Name"] = "l"
1861 print(f"{txt_table.get_string()}\n")