PAL: fix in ExecutionChecker
[csit.git] / resources / tools / presentation / input_data_parser.py
1 # Copyright (c) 2022 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Data pre-processing
15
16 - extract data from output.xml files generated by Jenkins jobs and store in
17   pandas' Series,
18 - provide access to the data.
19 - filter the data using tags,
20 """
21
22 import re
23 import copy
24 import resource
25 import logging
26
27 from collections import OrderedDict
28 from os import remove, walk, listdir
29 from os.path import isfile, isdir, join
30 from datetime import datetime as dt
31 from datetime import timedelta
32 from json import loads
33 from json.decoder import JSONDecodeError
34
35 import hdrh.histogram
36 import hdrh.codec
37 import prettytable
38 import pandas as pd
39
40 from robot.api import ExecutionResult, ResultVisitor
41 from robot import errors
42
43 from resources.libraries.python import jumpavg
44 from input_data_files import download_and_unzip_data_file
45 from pal_errors import PresentationError
46
47
48 # Separator used in file names
49 SEPARATOR = "__"
50
51
52 class ExecutionChecker(ResultVisitor):
53     """Class to traverse through the test suite structure.
54     """
55
56     REGEX_PLR_RATE = re.compile(
57         r'PLRsearch lower bound::?\s(\d+.\d+).*\n'
58         r'PLRsearch upper bound::?\s(\d+.\d+)'
59     )
60     REGEX_NDRPDR_RATE = re.compile(
61         r'NDR_LOWER:\s(\d+.\d+).*\n.*\n'
62         r'NDR_UPPER:\s(\d+.\d+).*\n'
63         r'PDR_LOWER:\s(\d+.\d+).*\n.*\n'
64         r'PDR_UPPER:\s(\d+.\d+)'
65     )
66     REGEX_NDRPDR_GBPS = re.compile(
67         r'NDR_LOWER:.*,\s(\d+.\d+).*\n.*\n'
68         r'NDR_UPPER:.*,\s(\d+.\d+).*\n'
69         r'PDR_LOWER:.*,\s(\d+.\d+).*\n.*\n'
70         r'PDR_UPPER:.*,\s(\d+.\d+)'
71     )
72     REGEX_PERF_MSG_INFO = re.compile(
73         r'NDR_LOWER:\s(\d+.\d+)\s.*\s(\d+.\d+)\s.*\n.*\n.*\n'
74         r'PDR_LOWER:\s(\d+.\d+)\s.*\s(\d+.\d+)\s.*\n.*\n.*\n'
75         r'Latency at 90% PDR:.*\[\'(.*)\', \'(.*)\'\].*\n'
76         r'Latency at 50% PDR:.*\[\'(.*)\', \'(.*)\'\].*\n'
77         r'Latency at 10% PDR:.*\[\'(.*)\', \'(.*)\'\].*\n'
78     )
79     REGEX_CPS_MSG_INFO = re.compile(
80         r'NDR_LOWER:\s(\d+.\d+)\s.*\s.*\n.*\n.*\n'
81         r'PDR_LOWER:\s(\d+.\d+)\s.*\s.*\n.*\n.*'
82     )
83     REGEX_PPS_MSG_INFO = re.compile(
84         r'NDR_LOWER:\s(\d+.\d+)\s.*\s(\d+.\d+)\s.*\n.*\n.*\n'
85         r'PDR_LOWER:\s(\d+.\d+)\s.*\s(\d+.\d+)\s.*\n.*\n.*'
86     )
87     REGEX_MRR_MSG_INFO = re.compile(r'.*\[(.*)\]')
88
89     REGEX_VSAP_MSG_INFO = re.compile(
90         r'Transfer Rate: (\d*.\d*).*\n'
91         r'Latency: (\d*.\d*).*\n'
92         r'Completed requests: (\d*).*\n'
93         r'Failed requests: (\d*).*\n'
94         r'Total data transferred: (\d*).*\n'
95         r'Connection [cr]ps rate:\s*(\d*.\d*)'
96     )
97
98     # Needed for CPS and PPS tests
99     REGEX_NDRPDR_LAT_BASE = re.compile(
100         r'LATENCY.*\[\'(.*)\', \'(.*)\'\]\s\n.*\n.*\n'
101         r'LATENCY.*\[\'(.*)\', \'(.*)\'\]'
102     )
103     REGEX_NDRPDR_LAT = re.compile(
104         r'LATENCY.*\[\'(.*)\', \'(.*)\'\]\s\n.*\n.*\n'
105         r'LATENCY.*\[\'(.*)\', \'(.*)\'\]\s\n.*\n'
106         r'Latency.*\[\'(.*)\', \'(.*)\'\]\s\n'
107         r'Latency.*\[\'(.*)\', \'(.*)\'\]\s\n'
108         r'Latency.*\[\'(.*)\', \'(.*)\'\]\s\n'
109         r'Latency.*\[\'(.*)\', \'(.*)\'\]'
110     )
111
112     REGEX_VERSION_VPP = re.compile(
113         r"(VPP Version:\s*|VPP version:\s*)(.*)"
114     )
115     REGEX_VERSION_DPDK = re.compile(
116         r"(DPDK version:\s*|DPDK Version:\s*)(.*)"
117     )
118     REGEX_TCP = re.compile(
119         r'Total\s(rps|cps|throughput):\s(\d*).*$'
120     )
121     REGEX_MRR = re.compile(
122         r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
123         r'tx\s(\d*),\srx\s(\d*)'
124     )
125     REGEX_BMRR = re.compile(
126         r'.*trial results.*: \[(.*)\]'
127     )
128     REGEX_RECONF_LOSS = re.compile(
129         r'Packets lost due to reconfig: (\d*)'
130     )
131     REGEX_RECONF_TIME = re.compile(
132         r'Implied time lost: (\d*.[\de-]*)'
133     )
134     REGEX_TC_TAG = re.compile(r'\d+[tT]\d+[cC]')
135
136     REGEX_TC_NAME_NEW = re.compile(r'-\d+[cC]-')
137
138     REGEX_TC_NUMBER = re.compile(r'tc\d{2}-')
139
140     REGEX_TC_PAPI_CLI = re.compile(r'.*\((\d+.\d+.\d+.\d+.) - (.*)\)')
141
142     REGEX_SH_RUN_HOST = re.compile(
143         r'hostname=\"(\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})\",hook=\"(.*)\"'
144     )
145
146     def __init__(self, metadata, mapping, ignore, process_oper):
147         """Initialisation.
148
149         :param metadata: Key-value pairs to be included in "metadata" part of
150             JSON structure.
151         :param mapping: Mapping of the old names of test cases to the new
152             (actual) one.
153         :param ignore: List of TCs to be ignored.
154         :param process_oper: If True, operational data (show run, telemetry) is
155             processed.
156         :type metadata: dict
157         :type mapping: dict
158         :type ignore: list
159         :type process_oper: bool
160         """
161
162         # Mapping of TCs long names
163         self._mapping = mapping
164
165         # Ignore list
166         self._ignore = ignore
167
168         # Process operational data
169         self._process_oper = process_oper
170
171         # Name of currently processed keyword
172         self._kw_name = None
173
174         # VPP version
175         self._version = None
176
177         # Timestamp
178         self._timestamp = None
179
180         # Testbed. The testbed is identified by TG node IP address.
181         self._testbed = None
182
183         # Number of PAPI History messages found:
184         # 0 - no message
185         # 1 - PAPI History of DUT1
186         # 2 - PAPI History of DUT2
187         self._conf_history_lookup_nr = 0
188
189         self._sh_run_counter = 0
190         self._telemetry_kw_counter = 0
191         self._telemetry_msg_counter = 0
192
193         # Test ID of currently processed test- the lowercase full path to the
194         # test
195         self._test_id = None
196
197         # The main data structure
198         self._data = {
199             "metadata": dict(),
200             "suites": dict(),
201             "tests": dict()
202         }
203
204         # Save the provided metadata
205         for key, val in metadata.items():
206             self._data["metadata"][key] = val
207
208     @property
209     def data(self):
210         """Getter - Data parsed from the XML file.
211
212         :returns: Data parsed from the XML file.
213         :rtype: dict
214         """
215         return self._data
216
217     def _get_data_from_mrr_test_msg(self, msg):
218         """Get info from message of MRR performance tests.
219
220         :param msg: Message to be processed.
221         :type msg: str
222         :returns: Processed message or original message if a problem occurs.
223         :rtype: str
224         """
225
226         groups = re.search(self.REGEX_MRR_MSG_INFO, msg)
227         if not groups or groups.lastindex != 1:
228             return "Test Failed."
229
230         try:
231             data = groups.group(1).split(", ")
232         except (AttributeError, IndexError, ValueError, KeyError):
233             return "Test Failed."
234
235         out_str = "["
236         try:
237             for item in data:
238                 out_str += f"{(float(item) / 1e6):.2f}, "
239             return out_str[:-2] + "]"
240         except (AttributeError, IndexError, ValueError, KeyError):
241             return "Test Failed."
242
243     def _get_data_from_cps_test_msg(self, msg):
244         """Get info from message of NDRPDR CPS tests.
245
246         :param msg: Message to be processed.
247         :type msg: str
248         :returns: Processed message or "Test Failed." if a problem occurs.
249         :rtype: str
250         """
251
252         groups = re.search(self.REGEX_CPS_MSG_INFO, msg)
253         if not groups or groups.lastindex != 2:
254             return "Test Failed."
255
256         try:
257             return (
258                 f"1. {(float(groups.group(1)) / 1e6):5.2f}\n"
259                 f"2. {(float(groups.group(2)) / 1e6):5.2f}"
260             )
261         except (AttributeError, IndexError, ValueError, KeyError):
262             return "Test Failed."
263
264     def _get_data_from_pps_test_msg(self, msg):
265         """Get info from message of NDRPDR PPS tests.
266
267         :param msg: Message to be processed.
268         :type msg: str
269         :returns: Processed message or "Test Failed." if a problem occurs.
270         :rtype: str
271         """
272
273         groups = re.search(self.REGEX_PPS_MSG_INFO, msg)
274         if not groups or groups.lastindex != 4:
275             return "Test Failed."
276
277         try:
278             return (
279                 f"1. {(float(groups.group(1)) / 1e6):5.2f}      "
280                 f"{float(groups.group(2)):5.2f}\n"
281                 f"2. {(float(groups.group(3)) / 1e6):5.2f}      "
282                 f"{float(groups.group(4)):5.2f}"
283             )
284         except (AttributeError, IndexError, ValueError, KeyError):
285             return "Test Failed."
286
287     def _get_data_from_perf_test_msg(self, msg):
288         """Get info from message of NDRPDR performance tests.
289
290         :param msg: Message to be processed.
291         :type msg: str
292         :returns: Processed message or "Test Failed." if a problem occurs.
293         :rtype: str
294         """
295
296         groups = re.search(self.REGEX_PERF_MSG_INFO, msg)
297         if not groups or groups.lastindex != 10:
298             return "Test Failed."
299
300         try:
301             data = {
302                 "ndr_low": float(groups.group(1)),
303                 "ndr_low_b": float(groups.group(2)),
304                 "pdr_low": float(groups.group(3)),
305                 "pdr_low_b": float(groups.group(4)),
306                 "pdr_lat_90_1": groups.group(5),
307                 "pdr_lat_90_2": groups.group(6),
308                 "pdr_lat_50_1": groups.group(7),
309                 "pdr_lat_50_2": groups.group(8),
310                 "pdr_lat_10_1": groups.group(9),
311                 "pdr_lat_10_2": groups.group(10),
312             }
313         except (AttributeError, IndexError, ValueError, KeyError):
314             return "Test Failed."
315
316         def _process_lat(in_str_1, in_str_2):
317             """Extract P50, P90 and P99 latencies or min, avg, max values from
318             latency string.
319
320             :param in_str_1: Latency string for one direction produced by robot
321                 framework.
322             :param in_str_2: Latency string for second direction produced by
323                 robot framework.
324             :type in_str_1: str
325             :type in_str_2: str
326             :returns: Processed latency string or None if a problem occurs.
327             :rtype: tuple
328             """
329             in_list_1 = in_str_1.split('/', 3)
330             in_list_2 = in_str_2.split('/', 3)
331
332             if len(in_list_1) != 4 and len(in_list_2) != 4:
333                 return None
334
335             in_list_1[3] += "=" * (len(in_list_1[3]) % 4)
336             try:
337                 hdr_lat_1 = hdrh.histogram.HdrHistogram.decode(in_list_1[3])
338             except hdrh.codec.HdrLengthException:
339                 hdr_lat_1 = None
340
341             in_list_2[3] += "=" * (len(in_list_2[3]) % 4)
342             try:
343                 hdr_lat_2 = hdrh.histogram.HdrHistogram.decode(in_list_2[3])
344             except hdrh.codec.HdrLengthException:
345                 hdr_lat_2 = None
346
347             if hdr_lat_1 and hdr_lat_2:
348                 hdr_lat = (
349                     hdr_lat_1.get_value_at_percentile(50.0),
350                     hdr_lat_1.get_value_at_percentile(90.0),
351                     hdr_lat_1.get_value_at_percentile(99.0),
352                     hdr_lat_2.get_value_at_percentile(50.0),
353                     hdr_lat_2.get_value_at_percentile(90.0),
354                     hdr_lat_2.get_value_at_percentile(99.0)
355                 )
356                 if all(hdr_lat):
357                     return hdr_lat
358
359             hdr_lat = (
360                 int(in_list_1[0]), int(in_list_1[1]), int(in_list_1[2]),
361                 int(in_list_2[0]), int(in_list_2[1]), int(in_list_2[2])
362             )
363             for item in hdr_lat:
364                 if item in (-1, 4294967295, 0):
365                     return None
366             return hdr_lat
367
368         try:
369             out_msg = (
370                 f"1. {(data['ndr_low'] / 1e6):5.2f}      "
371                 f"{data['ndr_low_b']:5.2f}"
372                 f"\n2. {(data['pdr_low'] / 1e6):5.2f}      "
373                 f"{data['pdr_low_b']:5.2f}"
374             )
375             latency = (
376                 _process_lat(data['pdr_lat_10_1'], data['pdr_lat_10_2']),
377                 _process_lat(data['pdr_lat_50_1'], data['pdr_lat_50_2']),
378                 _process_lat(data['pdr_lat_90_1'], data['pdr_lat_90_2'])
379             )
380             if all(latency):
381                 max_len = len(str(max((max(item) for item in latency))))
382                 max_len = 4 if max_len < 4 else max_len
383
384                 for idx, lat in enumerate(latency):
385                     if not idx:
386                         out_msg += "\n"
387                     out_msg += (
388                         f"\n{idx + 3}. "
389                         f"{lat[0]:{max_len}d} "
390                         f"{lat[1]:{max_len}d} "
391                         f"{lat[2]:{max_len}d}      "
392                         f"{lat[3]:{max_len}d} "
393                         f"{lat[4]:{max_len}d} "
394                         f"{lat[5]:{max_len}d} "
395                     )
396
397             return out_msg
398
399         except (AttributeError, IndexError, ValueError, KeyError):
400             return "Test Failed."
401
402     def _get_testbed(self, msg):
403         """Called when extraction of testbed IP is required.
404         The testbed is identified by TG node IP address.
405
406         :param msg: Message to process.
407         :type msg: Message
408         :returns: Nothing.
409         """
410
411         if msg.message.count("Setup of TG node") or \
412                 msg.message.count("Setup of node TG host"):
413             reg_tg_ip = re.compile(
414                 r'.*TG .* (\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}).*')
415             try:
416                 self._testbed = str(re.search(reg_tg_ip, msg.message).group(1))
417             except (KeyError, ValueError, IndexError, AttributeError):
418                 pass
419             finally:
420                 self._data["metadata"]["testbed"] = self._testbed
421
422     def _get_vpp_version(self, msg):
423         """Called when extraction of VPP version is required.
424
425         :param msg: Message to process.
426         :type msg: Message
427         :returns: Nothing.
428         """
429
430         if msg.message.count("VPP version:") or \
431                 msg.message.count("VPP Version:"):
432             self._version = str(
433                 re.search(self.REGEX_VERSION_VPP, msg.message).group(2)
434             )
435             self._data["metadata"]["version"] = self._version
436
437     def _get_dpdk_version(self, msg):
438         """Called when extraction of DPDK version is required.
439
440         :param msg: Message to process.
441         :type msg: Message
442         :returns: Nothing.
443         """
444
445         if msg.message.count("DPDK Version:"):
446             try:
447                 self._version = str(re.search(
448                     self.REGEX_VERSION_DPDK, msg.message).group(2))
449                 self._data["metadata"]["version"] = self._version
450             except IndexError:
451                 pass
452
453     def _get_papi_history(self, msg):
454         """Called when extraction of PAPI command history is required.
455
456         :param msg: Message to process.
457         :type msg: Message
458         :returns: Nothing.
459         """
460         if msg.message.count("PAPI command history:"):
461             self._conf_history_lookup_nr += 1
462             if self._conf_history_lookup_nr == 1:
463                 self._data["tests"][self._test_id]["conf-history"] = str()
464             text = re.sub(
465                 r"\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3} PAPI command history:",
466                 "",
467                 msg.message,
468                 count=1
469             ).replace('"', "'")
470             self._data["tests"][self._test_id]["conf-history"] += \
471                 f"**DUT{str(self._conf_history_lookup_nr)}:** {text}"
472
473     def _get_show_run(self, msg):
474         """Called when extraction of VPP operational data (output of CLI command
475         Show Runtime) is required.
476
477         :param msg: Message to process.
478         :type msg: Message
479         :returns: Nothing.
480         """
481
482         if not msg.message.count("stats runtime"):
483             return
484
485         # Temporary solution
486         if self._sh_run_counter > 1:
487             return
488
489         if "show-run" not in self._data["tests"][self._test_id].keys():
490             self._data["tests"][self._test_id]["show-run"] = dict()
491
492         groups = re.search(self.REGEX_TC_PAPI_CLI, msg.message)
493         if not groups:
494             return
495         try:
496             host = groups.group(1)
497         except (AttributeError, IndexError):
498             host = ""
499         try:
500             sock = groups.group(2)
501         except (AttributeError, IndexError):
502             sock = ""
503
504         dut = "dut{nr}".format(
505             nr=len(self._data['tests'][self._test_id]['show-run'].keys()) + 1)
506
507         self._data['tests'][self._test_id]['show-run'][dut] = \
508             copy.copy(
509                 {
510                     "host": host,
511                     "socket": sock,
512                     "runtime": str(msg.message).replace(' ', '').
513                                 replace('\n', '').replace("'", '"').
514                                 replace('b"', '"').replace('"', '"').
515                                 split(":", 1)[1]
516                 }
517             )
518
519     def _get_telemetry(self, msg):
520         """Called when extraction of VPP telemetry data is required.
521
522         :param msg: Message to process.
523         :type msg: Message
524         :returns: Nothing.
525         """
526
527         if self._telemetry_kw_counter > 1:
528             return
529         if not msg.message.count("# TYPE vpp_runtime_calls"):
530             return
531
532         if "telemetry-show-run" not in \
533                 self._data["tests"][self._test_id].keys():
534             self._data["tests"][self._test_id]["telemetry-show-run"] = dict()
535
536         self._telemetry_msg_counter += 1
537         groups = re.search(self.REGEX_SH_RUN_HOST, msg.message)
538         if not groups:
539             return
540         try:
541             host = groups.group(1)
542         except (AttributeError, IndexError):
543             host = ""
544         try:
545             sock = groups.group(2)
546         except (AttributeError, IndexError):
547             sock = ""
548         runtime = {
549             "source_type": "node",
550             "source_id": host,
551             "msg_type": "metric",
552             "log_level": "INFO",
553             "timestamp": msg.timestamp,
554             "msg": "show_runtime",
555             "host": host,
556             "socket": sock,
557             "data": list()
558         }
559         for line in msg.message.splitlines():
560             if not line.startswith("vpp_runtime_"):
561                 continue
562             try:
563                 params, value, timestamp = line.rsplit(" ", maxsplit=2)
564                 cut = params.index("{")
565                 name = params[:cut].split("_", maxsplit=2)[-1]
566                 labels = eval(
567                     "dict" + params[cut:].replace('{', '(').replace('}', ')')
568                 )
569                 labels["graph_node"] = labels.pop("name")
570                 runtime["data"].append(
571                     {
572                         "name": name,
573                         "value": value,
574                         "timestamp": timestamp,
575                         "labels": labels
576                     }
577                 )
578             except (TypeError, ValueError, IndexError):
579                 continue
580         self._data['tests'][self._test_id]['telemetry-show-run']\
581             [f"dut{self._telemetry_msg_counter}"] = copy.copy(
582                 {
583                     "host": host,
584                     "socket": sock,
585                     "runtime": runtime
586                 }
587             )
588
589     def _get_ndrpdr_throughput(self, msg):
590         """Get NDR_LOWER, NDR_UPPER, PDR_LOWER and PDR_UPPER from the test
591         message.
592
593         :param msg: The test message to be parsed.
594         :type msg: str
595         :returns: Parsed data as a dict and the status (PASS/FAIL).
596         :rtype: tuple(dict, str)
597         """
598
599         throughput = {
600             "NDR": {"LOWER": -1.0, "UPPER": -1.0},
601             "PDR": {"LOWER": -1.0, "UPPER": -1.0}
602         }
603         status = "FAIL"
604         groups = re.search(self.REGEX_NDRPDR_RATE, msg)
605
606         if groups is not None:
607             try:
608                 throughput["NDR"]["LOWER"] = float(groups.group(1))
609                 throughput["NDR"]["UPPER"] = float(groups.group(2))
610                 throughput["PDR"]["LOWER"] = float(groups.group(3))
611                 throughput["PDR"]["UPPER"] = float(groups.group(4))
612                 status = "PASS"
613             except (IndexError, ValueError):
614                 pass
615
616         return throughput, status
617
618     def _get_ndrpdr_throughput_gbps(self, msg):
619         """Get NDR_LOWER, NDR_UPPER, PDR_LOWER and PDR_UPPER in Gbps from the
620         test message.
621
622         :param msg: The test message to be parsed.
623         :type msg: str
624         :returns: Parsed data as a dict and the status (PASS/FAIL).
625         :rtype: tuple(dict, str)
626         """
627
628         gbps = {
629             "NDR": {"LOWER": -1.0, "UPPER": -1.0},
630             "PDR": {"LOWER": -1.0, "UPPER": -1.0}
631         }
632         status = "FAIL"
633         groups = re.search(self.REGEX_NDRPDR_GBPS, msg)
634
635         if groups is not None:
636             try:
637                 gbps["NDR"]["LOWER"] = float(groups.group(1))
638                 gbps["NDR"]["UPPER"] = float(groups.group(2))
639                 gbps["PDR"]["LOWER"] = float(groups.group(3))
640                 gbps["PDR"]["UPPER"] = float(groups.group(4))
641                 status = "PASS"
642             except (IndexError, ValueError):
643                 pass
644
645         return gbps, status
646
647     def _get_plr_throughput(self, msg):
648         """Get PLRsearch lower bound and PLRsearch upper bound from the test
649         message.
650
651         :param msg: The test message to be parsed.
652         :type msg: str
653         :returns: Parsed data as a dict and the status (PASS/FAIL).
654         :rtype: tuple(dict, str)
655         """
656
657         throughput = {
658             "LOWER": -1.0,
659             "UPPER": -1.0
660         }
661         status = "FAIL"
662         groups = re.search(self.REGEX_PLR_RATE, msg)
663
664         if groups is not None:
665             try:
666                 throughput["LOWER"] = float(groups.group(1))
667                 throughput["UPPER"] = float(groups.group(2))
668                 status = "PASS"
669             except (IndexError, ValueError):
670                 pass
671
672         return throughput, status
673
674     def _get_ndrpdr_latency(self, msg):
675         """Get LATENCY from the test message.
676
677         :param msg: The test message to be parsed.
678         :type msg: str
679         :returns: Parsed data as a dict and the status (PASS/FAIL).
680         :rtype: tuple(dict, str)
681         """
682         latency_default = {
683             "min": -1.0,
684             "avg": -1.0,
685             "max": -1.0,
686             "hdrh": ""
687         }
688         latency = {
689             "NDR": {
690                 "direction1": copy.copy(latency_default),
691                 "direction2": copy.copy(latency_default)
692             },
693             "PDR": {
694                 "direction1": copy.copy(latency_default),
695                 "direction2": copy.copy(latency_default)
696             },
697             "LAT0": {
698                 "direction1": copy.copy(latency_default),
699                 "direction2": copy.copy(latency_default)
700             },
701             "PDR10": {
702                 "direction1": copy.copy(latency_default),
703                 "direction2": copy.copy(latency_default)
704             },
705             "PDR50": {
706                 "direction1": copy.copy(latency_default),
707                 "direction2": copy.copy(latency_default)
708             },
709             "PDR90": {
710                 "direction1": copy.copy(latency_default),
711                 "direction2": copy.copy(latency_default)
712             },
713         }
714
715         groups = re.search(self.REGEX_NDRPDR_LAT, msg)
716         if groups is None:
717             groups = re.search(self.REGEX_NDRPDR_LAT_BASE, msg)
718         if groups is None:
719             return latency, "FAIL"
720
721         def process_latency(in_str):
722             """Return object with parsed latency values.
723
724             TODO: Define class for the return type.
725
726             :param in_str: Input string, min/avg/max/hdrh format.
727             :type in_str: str
728             :returns: Dict with corresponding keys, except hdrh float values.
729             :rtype dict:
730             :throws IndexError: If in_str does not have enough substrings.
731             :throws ValueError: If a substring does not convert to float.
732             """
733             in_list = in_str.split('/', 3)
734
735             rval = {
736                 "min": float(in_list[0]),
737                 "avg": float(in_list[1]),
738                 "max": float(in_list[2]),
739                 "hdrh": ""
740             }
741
742             if len(in_list) == 4:
743                 rval["hdrh"] = str(in_list[3])
744
745             return rval
746
747         try:
748             latency["NDR"]["direction1"] = process_latency(groups.group(1))
749             latency["NDR"]["direction2"] = process_latency(groups.group(2))
750             latency["PDR"]["direction1"] = process_latency(groups.group(3))
751             latency["PDR"]["direction2"] = process_latency(groups.group(4))
752             if groups.lastindex == 4:
753                 return latency, "PASS"
754         except (IndexError, ValueError):
755             pass
756
757         try:
758             latency["PDR90"]["direction1"] = process_latency(groups.group(5))
759             latency["PDR90"]["direction2"] = process_latency(groups.group(6))
760             latency["PDR50"]["direction1"] = process_latency(groups.group(7))
761             latency["PDR50"]["direction2"] = process_latency(groups.group(8))
762             latency["PDR10"]["direction1"] = process_latency(groups.group(9))
763             latency["PDR10"]["direction2"] = process_latency(groups.group(10))
764             latency["LAT0"]["direction1"] = process_latency(groups.group(11))
765             latency["LAT0"]["direction2"] = process_latency(groups.group(12))
766             if groups.lastindex == 12:
767                 return latency, "PASS"
768         except (IndexError, ValueError):
769             pass
770
771         return latency, "FAIL"
772
773     @staticmethod
774     def _get_hoststack_data(msg, tags):
775         """Get data from the hoststack test message.
776
777         :param msg: The test message to be parsed.
778         :param tags: Test tags.
779         :type msg: str
780         :type tags: list
781         :returns: Parsed data as a JSON dict and the status (PASS/FAIL).
782         :rtype: tuple(dict, str)
783         """
784         result = dict()
785         status = "FAIL"
786
787         msg = msg.replace("'", '"').replace(" ", "")
788         if "LDPRELOAD" in tags:
789             try:
790                 result = loads(msg)
791                 status = "PASS"
792             except JSONDecodeError:
793                 pass
794         elif "VPPECHO" in tags:
795             try:
796                 msg_lst = msg.replace("}{", "} {").split(" ")
797                 result = dict(
798                     client=loads(msg_lst[0]),
799                     server=loads(msg_lst[1])
800                 )
801                 status = "PASS"
802             except (JSONDecodeError, IndexError):
803                 pass
804
805         return result, status
806
807     def _get_vsap_data(self, msg, tags):
808         """Get data from the vsap test message.
809
810         :param msg: The test message to be parsed.
811         :param tags: Test tags.
812         :type msg: str
813         :type tags: list
814         :returns: Parsed data as a JSON dict and the status (PASS/FAIL).
815         :rtype: tuple(dict, str)
816         """
817         result = dict()
818         status = "FAIL"
819
820         groups = re.search(self.REGEX_VSAP_MSG_INFO, msg)
821         if groups is not None:
822             try:
823                 result["transfer-rate"] = float(groups.group(1)) * 1e3
824                 result["latency"] = float(groups.group(2))
825                 result["completed-requests"] = int(groups.group(3))
826                 result["failed-requests"] = int(groups.group(4))
827                 result["bytes-transferred"] = int(groups.group(5))
828                 if "TCP_CPS"in tags:
829                     result["cps"] = float(groups.group(6))
830                 elif "TCP_RPS" in tags:
831                     result["rps"] = float(groups.group(6))
832                 else:
833                     return result, status
834                 status = "PASS"
835             except (IndexError, ValueError):
836                 pass
837
838         return result, status
839
840     def visit_suite(self, suite):
841         """Implements traversing through the suite and its direct children.
842
843         :param suite: Suite to process.
844         :type suite: Suite
845         :returns: Nothing.
846         """
847         if self.start_suite(suite) is not False:
848             suite.setup.visit(self)
849             suite.suites.visit(self)
850             suite.tests.visit(self)
851             suite.teardown.visit(self)
852             self.end_suite(suite)
853
854     def start_suite(self, suite):
855         """Called when suite starts.
856
857         :param suite: Suite to process.
858         :type suite: Suite
859         :returns: Nothing.
860         """
861         try:
862             parent_name = suite.parent.name
863         except AttributeError:
864             return
865
866         self._data["suites"][suite.longname.lower().replace('"', "'").\
867             replace(" ", "_")] = {
868                 "name": suite.name.lower(),
869                 "doc": suite.doc,
870                 "parent": parent_name,
871                 "level": len(suite.longname.split("."))
872             }
873
874     def visit_test(self, test):
875         """Implements traversing through the test.
876
877         :param test: Test to process.
878         :type test: Test
879         :returns: Nothing.
880         """
881         if self.start_test(test) is not False:
882             test.setup.visit(self)
883             test.body.visit(self)
884             test.teardown.visit(self)
885             self.end_test(test)
886
887     def start_test(self, test):
888         """Called when test starts.
889
890         :param test: Test to process.
891         :type test: Test
892         :returns: Nothing.
893         """
894
895         self._sh_run_counter = 0
896         self._telemetry_kw_counter = 0
897         self._telemetry_msg_counter = 0
898
899         longname_orig = test.longname.lower()
900
901         # Check the ignore list
902         if longname_orig in self._ignore:
903             return
904
905         tags = [str(tag) for tag in test.tags]
906         test_result = dict()
907
908         # Change the TC long name and name if defined in the mapping table
909         longname = self._mapping.get(longname_orig, None)
910         if longname is not None:
911             name = longname.split('.')[-1]
912         else:
913             longname = longname_orig
914             name = test.name.lower()
915
916         # Remove TC number from the TC long name (backward compatibility):
917         self._test_id = re.sub(self.REGEX_TC_NUMBER, "", longname)
918         # Remove TC number from the TC name (not needed):
919         test_result["name"] = re.sub(self.REGEX_TC_NUMBER, "", name)
920
921         test_result["parent"] = test.parent.name.lower()
922         test_result["tags"] = tags
923         test_result["doc"] = test.doc
924         test_result["type"] = ""
925         test_result["status"] = test.status
926         test_result["starttime"] = test.starttime
927         test_result["endtime"] = test.endtime
928
929         if test.status == "PASS":
930             if "NDRPDR" in tags:
931                 if "TCP_PPS" in tags or "UDP_PPS" in tags:
932                     test_result["msg"] = self._get_data_from_pps_test_msg(
933                         test.message)
934                 elif "TCP_CPS" in tags or "UDP_CPS" in tags:
935                     test_result["msg"] = self._get_data_from_cps_test_msg(
936                         test.message)
937                 else:
938                     test_result["msg"] = self._get_data_from_perf_test_msg(
939                         test.message)
940             elif "MRR" in tags or "FRMOBL" in tags or "BMRR" in tags:
941                 test_result["msg"] = self._get_data_from_mrr_test_msg(
942                     test.message)
943             else:
944                 test_result["msg"] = test.message
945         else:
946             test_result["msg"] = test.message
947
948         if "PERFTEST" in tags and "TREX" not in tags:
949             # Replace info about cores (e.g. -1c-) with the info about threads
950             # and cores (e.g. -1t1c-) in the long test case names and in the
951             # test case names if necessary.
952             tag_count = 0
953             tag_tc = str()
954             for tag in test_result["tags"]:
955                 groups = re.search(self.REGEX_TC_TAG, tag)
956                 if groups:
957                     tag_count += 1
958                     tag_tc = tag
959
960             if tag_count == 1:
961                 self._test_id = re.sub(
962                     self.REGEX_TC_NAME_NEW, f"-{tag_tc.lower()}-",
963                     self._test_id, count=1
964                 )
965                 test_result["name"] = re.sub(
966                     self.REGEX_TC_NAME_NEW, f"-{tag_tc.lower()}-",
967                     test_result["name"], count=1
968                 )
969             else:
970                 test_result["status"] = "FAIL"
971                 self._data["tests"][self._test_id] = test_result
972                 logging.debug(
973                     f"The test {self._test_id} has no or more than one "
974                     f"multi-threading tags.\n"
975                     f"Tags: {test_result['tags']}"
976                 )
977                 return
978
979         if "DEVICETEST" in tags:
980             test_result["type"] = "DEVICETEST"
981         elif "NDRPDR" in tags:
982             if "TCP_CPS" in tags or "UDP_CPS" in tags:
983                 test_result["type"] = "CPS"
984             else:
985                 test_result["type"] = "NDRPDR"
986             if test.status == "PASS":
987                 test_result["throughput"], test_result["status"] = \
988                     self._get_ndrpdr_throughput(test.message)
989                 test_result["gbps"], test_result["status"] = \
990                     self._get_ndrpdr_throughput_gbps(test.message)
991                 test_result["latency"], test_result["status"] = \
992                     self._get_ndrpdr_latency(test.message)
993         elif "MRR" in tags or "FRMOBL" in tags or "BMRR" in tags:
994             if "MRR" in tags:
995                 test_result["type"] = "MRR"
996             else:
997                 test_result["type"] = "BMRR"
998             if test.status == "PASS":
999                 test_result["result"] = dict()
1000                 groups = re.search(self.REGEX_BMRR, test.message)
1001                 if groups is not None:
1002                     items_str = groups.group(1)
1003                     items_float = [
1004                         float(item.strip().replace("'", ""))
1005                         for item in items_str.split(",")
1006                     ]
1007                     # Use whole list in CSIT-1180.
1008                     stats = jumpavg.AvgStdevStats.for_runs(items_float)
1009                     test_result["result"]["samples"] = items_float
1010                     test_result["result"]["receive-rate"] = stats.avg
1011                     test_result["result"]["receive-stdev"] = stats.stdev
1012                 else:
1013                     groups = re.search(self.REGEX_MRR, test.message)
1014                     test_result["result"]["receive-rate"] = \
1015                         float(groups.group(3)) / float(groups.group(1))
1016         elif "SOAK" in tags:
1017             test_result["type"] = "SOAK"
1018             if test.status == "PASS":
1019                 test_result["throughput"], test_result["status"] = \
1020                     self._get_plr_throughput(test.message)
1021         elif "LDP_NGINX" in tags:
1022             test_result["type"] = "LDP_NGINX"
1023             test_result["result"], test_result["status"] = \
1024                 self._get_vsap_data(test.message, tags)
1025         elif "HOSTSTACK" in tags:
1026             test_result["type"] = "HOSTSTACK"
1027             if test.status == "PASS":
1028                 test_result["result"], test_result["status"] = \
1029                     self._get_hoststack_data(test.message, tags)
1030         elif "RECONF" in tags:
1031             test_result["type"] = "RECONF"
1032             if test.status == "PASS":
1033                 test_result["result"] = None
1034                 try:
1035                     grps_loss = re.search(self.REGEX_RECONF_LOSS, test.message)
1036                     grps_time = re.search(self.REGEX_RECONF_TIME, test.message)
1037                     test_result["result"] = {
1038                         "loss": int(grps_loss.group(1)),
1039                         "time": float(grps_time.group(1))
1040                     }
1041                 except (AttributeError, IndexError, ValueError, TypeError):
1042                     test_result["status"] = "FAIL"
1043         else:
1044             test_result["status"] = "FAIL"
1045
1046         self._data["tests"][self._test_id] = test_result
1047
1048     def visit_keyword(self, kw):
1049         """Implements traversing through the keyword and its child keywords.
1050
1051         :param keyword: Keyword to process.
1052         :type keyword: Keyword
1053         :returns: Nothing.
1054         """
1055         if self.start_keyword(kw) is not False:
1056             if hasattr(kw, "body"):
1057                 kw.body.visit(self)
1058             kw.teardown.visit(self)
1059             self.end_keyword(kw)
1060
1061     def start_keyword(self, keyword):
1062         """Called when keyword starts. Default implementation does nothing.
1063
1064         :param keyword: Keyword to process.
1065         :type keyword: Keyword
1066         :returns: Nothing.
1067         """
1068         self._kw_name = keyword.name
1069
1070     def end_keyword(self, keyword):
1071         """Called when keyword ends. Default implementation does nothing.
1072
1073         :param keyword: Keyword to process.
1074         :type keyword: Keyword
1075         :returns: Nothing.
1076         """
1077         _ = keyword
1078         self._kw_name = None
1079
1080     def visit_message(self, msg):
1081         """Implements visiting the message.
1082
1083         :param msg: Message to process.
1084         :type msg: Message
1085         :returns: Nothing.
1086         """
1087         if self.start_message(msg) is not False:
1088             self.end_message(msg)
1089
1090     def start_message(self, msg):
1091         """Called when message starts. Get required information from messages:
1092         - VPP version.
1093
1094         :param msg: Message to process.
1095         :type msg: Message
1096         :returns: Nothing.
1097         """
1098         if self._kw_name is None:
1099             return
1100         elif self._kw_name.count("Run Telemetry On All Duts"):
1101             self._telemetry_kw_counter += 1
1102             self._get_telemetry(msg)
1103         elif self._kw_name.count("Show Runtime On All Duts"):
1104             self._sh_run_counter += 1
1105             self._get_show_run(msg)
1106         elif self._kw_name.count("Show Vpp Version On All Duts"):
1107             if not self._version:
1108                 self._get_vpp_version(msg)
1109         elif self._kw_name.count("Install Dpdk Framework On All Duts"):
1110             if not self._version:
1111                 self._get_dpdk_version(msg)
1112         elif self._kw_name.count("Setup Framework"):
1113             if not self._testbed:
1114                 self._get_testbed(msg)
1115         elif self._kw_name.count("Show Papi History On All Duts"):
1116             self._conf_history_lookup_nr = 0
1117             self._get_papi_history(msg)
1118
1119
1120 class InputData:
1121     """Input data
1122
1123     The data is extracted from output.xml files generated by Jenkins jobs and
1124     stored in pandas' DataFrames.
1125
1126     The data structure:
1127     - job name
1128       - build number
1129         - metadata
1130           (as described in ExecutionChecker documentation)
1131         - suites
1132           (as described in ExecutionChecker documentation)
1133         - tests
1134           (as described in ExecutionChecker documentation)
1135     """
1136
1137     def __init__(self, spec, for_output):
1138         """Initialization.
1139
1140         :param spec: Specification.
1141         :param for_output: Output to be generated from downloaded data.
1142         :type spec: Specification
1143         :type for_output: str
1144         """
1145
1146         # Specification:
1147         self._cfg = spec
1148
1149         self._for_output = for_output
1150
1151         # Data store:
1152         self._input_data = pd.Series(dtype="float64")
1153
1154     @property
1155     def data(self):
1156         """Getter - Input data.
1157
1158         :returns: Input data
1159         :rtype: pandas.Series
1160         """
1161         return self._input_data
1162
1163     def metadata(self, job, build):
1164         """Getter - metadata
1165
1166         :param job: Job which metadata we want.
1167         :param build: Build which metadata we want.
1168         :type job: str
1169         :type build: str
1170         :returns: Metadata
1171         :rtype: pandas.Series
1172         """
1173         return self.data[job][build]["metadata"]
1174
1175     def suites(self, job, build):
1176         """Getter - suites
1177
1178         :param job: Job which suites we want.
1179         :param build: Build which suites we want.
1180         :type job: str
1181         :type build: str
1182         :returns: Suites.
1183         :rtype: pandas.Series
1184         """
1185         return self.data[job][str(build)]["suites"]
1186
1187     def tests(self, job, build):
1188         """Getter - tests
1189
1190         :param job: Job which tests we want.
1191         :param build: Build which tests we want.
1192         :type job: str
1193         :type build: str
1194         :returns: Tests.
1195         :rtype: pandas.Series
1196         """
1197         return self.data[job][build]["tests"]
1198
1199     def _parse_tests(self, job, build):
1200         """Process data from robot output.xml file and return JSON structured
1201         data.
1202
1203         :param job: The name of job which build output data will be processed.
1204         :param build: The build which output data will be processed.
1205         :type job: str
1206         :type build: dict
1207         :returns: JSON data structure.
1208         :rtype: dict
1209         """
1210
1211         metadata = {
1212             "job": job,
1213             "build": build
1214         }
1215
1216         with open(build["file-name"], 'r') as data_file:
1217             try:
1218                 result = ExecutionResult(data_file)
1219             except errors.DataError as err:
1220                 logging.error(
1221                     f"Error occurred while parsing output.xml: {repr(err)}"
1222                 )
1223                 return None
1224
1225         process_oper = False
1226         if "-vpp-perf-report-coverage-" in job:
1227             process_oper = True
1228         # elif "-vpp-perf-report-iterative-" in job:
1229         #     # Exceptions for TBs where we do not have coverage data:
1230         #     for item in ("-2n-icx", ):
1231         #         if item in job:
1232         #             process_oper = True
1233         #             break
1234         checker = ExecutionChecker(
1235             metadata, self._cfg.mapping, self._cfg.ignore, process_oper
1236         )
1237         result.visit(checker)
1238
1239         checker.data["metadata"]["tests_total"] = \
1240             result.statistics.total.total
1241         checker.data["metadata"]["tests_passed"] = \
1242             result.statistics.total.passed
1243         checker.data["metadata"]["tests_failed"] = \
1244             result.statistics.total.failed
1245         checker.data["metadata"]["elapsedtime"] = result.suite.elapsedtime
1246         checker.data["metadata"]["generated"] = result.suite.endtime[:14]
1247
1248         return checker.data
1249
1250     def _download_and_parse_build(self, job, build, repeat, pid=10000):
1251         """Download and parse the input data file.
1252
1253         :param pid: PID of the process executing this method.
1254         :param job: Name of the Jenkins job which generated the processed input
1255             file.
1256         :param build: Information about the Jenkins build which generated the
1257             processed input file.
1258         :param repeat: Repeat the download specified number of times if not
1259             successful.
1260         :type pid: int
1261         :type job: str
1262         :type build: dict
1263         :type repeat: int
1264         """
1265
1266         logging.info(f"Processing the job/build: {job}: {build['build']}")
1267
1268         state = "failed"
1269         success = False
1270         data = None
1271         do_repeat = repeat
1272         while do_repeat:
1273             success = download_and_unzip_data_file(self._cfg, job, build, pid)
1274             if success:
1275                 break
1276             do_repeat -= 1
1277         if not success:
1278             logging.error(
1279                 f"It is not possible to download the input data file from the "
1280                 f"job {job}, build {build['build']}, or it is damaged. "
1281                 f"Skipped."
1282             )
1283         if success:
1284             logging.info(f"  Processing data from build {build['build']}")
1285             data = self._parse_tests(job, build)
1286             if data is None:
1287                 logging.error(
1288                     f"Input data file from the job {job}, build "
1289                     f"{build['build']} is damaged. Skipped."
1290                 )
1291             else:
1292                 state = "processed"
1293
1294             try:
1295                 remove(build["file-name"])
1296             except OSError as err:
1297                 logging.error(
1298                     f"Cannot remove the file {build['file-name']}: {repr(err)}"
1299                 )
1300
1301         # If the time-period is defined in the specification file, remove all
1302         # files which are outside the time period.
1303         is_last = False
1304         timeperiod = self._cfg.environment.get("time-period", None)
1305         if timeperiod and data:
1306             now = dt.utcnow()
1307             timeperiod = timedelta(int(timeperiod))
1308             metadata = data.get("metadata", None)
1309             if metadata:
1310                 generated = metadata.get("generated", None)
1311                 if generated:
1312                     generated = dt.strptime(generated, "%Y%m%d %H:%M")
1313                     if (now - generated) > timeperiod:
1314                         # Remove the data and the file:
1315                         state = "removed"
1316                         data = None
1317                         is_last = True
1318                         logging.info(
1319                             f"  The build {job}/{build['build']} is "
1320                             f"outdated, will be removed."
1321                         )
1322         return {
1323             "data": data,
1324             "state": state,
1325             "job": job,
1326             "build": build,
1327             "last": is_last
1328         }
1329
1330     def download_and_parse_data(self, repeat=1):
1331         """Download the input data files, parse input data from input files and
1332         store in pandas' Series.
1333
1334         :param repeat: Repeat the download specified number of times if not
1335             successful.
1336         :type repeat: int
1337         """
1338
1339         logging.info("Downloading and parsing input files ...")
1340
1341         for job, builds in self._cfg.input.items():
1342             for build in builds:
1343
1344                 result = self._download_and_parse_build(job, build, repeat)
1345                 if result["last"]:
1346                     break
1347                 build_nr = result["build"]["build"]
1348
1349                 if result["data"]:
1350                     data = result["data"]
1351                     build_data = pd.Series({
1352                         "metadata": pd.Series(
1353                             list(data["metadata"].values()),
1354                             index=list(data["metadata"].keys())
1355                         ),
1356                         "suites": pd.Series(
1357                             list(data["suites"].values()),
1358                             index=list(data["suites"].keys())
1359                         ),
1360                         "tests": pd.Series(
1361                             list(data["tests"].values()),
1362                             index=list(data["tests"].keys())
1363                         )
1364                     })
1365
1366                     if self._input_data.get(job, None) is None:
1367                         self._input_data[job] = pd.Series(dtype="float64")
1368                     self._input_data[job][str(build_nr)] = build_data
1369                     self._cfg.set_input_file_name(
1370                         job, build_nr, result["build"]["file-name"]
1371                     )
1372                 self._cfg.set_input_state(job, build_nr, result["state"])
1373
1374                 mem_alloc = \
1375                     resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000
1376                 logging.info(f"Memory allocation: {mem_alloc:.0f}MB")
1377
1378         logging.info("Done.")
1379
1380         msg = f"Successful downloads from the sources:\n"
1381         for source in self._cfg.environment["data-sources"]:
1382             if source["successful-downloads"]:
1383                 msg += (
1384                     f"{source['url']}/{source['path']}/"
1385                     f"{source['file-name']}: "
1386                     f"{source['successful-downloads']}\n"
1387                 )
1388         logging.info(msg)
1389
1390     def process_local_file(self, local_file, job="local", build_nr=1,
1391                            replace=True):
1392         """Process local XML file given as a command-line parameter.
1393
1394         :param local_file: The file to process.
1395         :param job: Job name.
1396         :param build_nr: Build number.
1397         :param replace: If True, the information about jobs and builds is
1398             replaced by the new one, otherwise the new jobs and builds are
1399             added.
1400         :type local_file: str
1401         :type job: str
1402         :type build_nr: int
1403         :type replace: bool
1404         :raises: PresentationError if an error occurs.
1405         """
1406         if not isfile(local_file):
1407             raise PresentationError(f"The file {local_file} does not exist.")
1408
1409         try:
1410             build_nr = int(local_file.split("/")[-1].split(".")[0])
1411         except (IndexError, ValueError):
1412             pass
1413
1414         build = {
1415             "build": build_nr,
1416             "status": "failed",
1417             "file-name": local_file
1418         }
1419         if replace:
1420             self._cfg.input = dict()
1421         self._cfg.add_build(job, build)
1422
1423         logging.info(f"Processing {job}: {build_nr:2d}: {local_file}")
1424         data = self._parse_tests(job, build)
1425         if data is None:
1426             raise PresentationError(
1427                 f"Error occurred while parsing the file {local_file}"
1428             )
1429
1430         build_data = pd.Series({
1431             "metadata": pd.Series(
1432                 list(data["metadata"].values()),
1433                 index=list(data["metadata"].keys())
1434             ),
1435             "suites": pd.Series(
1436                 list(data["suites"].values()),
1437                 index=list(data["suites"].keys())
1438             ),
1439             "tests": pd.Series(
1440                 list(data["tests"].values()),
1441                 index=list(data["tests"].keys())
1442             )
1443         })
1444
1445         if self._input_data.get(job, None) is None:
1446             self._input_data[job] = pd.Series(dtype="float64")
1447         self._input_data[job][str(build_nr)] = build_data
1448
1449         self._cfg.set_input_state(job, build_nr, "processed")
1450
1451     def process_local_directory(self, local_dir, replace=True):
1452         """Process local directory with XML file(s). The directory is processed
1453         as a 'job' and the XML files in it as builds.
1454         If the given directory contains only sub-directories, these
1455         sub-directories processed as jobs and corresponding XML files as builds
1456         of their job.
1457
1458         :param local_dir: Local directory to process.
1459         :param replace: If True, the information about jobs and builds is
1460             replaced by the new one, otherwise the new jobs and builds are
1461             added.
1462         :type local_dir: str
1463         :type replace: bool
1464         """
1465         if not isdir(local_dir):
1466             raise PresentationError(
1467                 f"The directory {local_dir} does not exist."
1468             )
1469
1470         # Check if the given directory includes only files, or only directories
1471         _, dirnames, filenames = next(walk(local_dir))
1472
1473         if filenames and not dirnames:
1474             filenames.sort()
1475             # local_builds:
1476             # key: dir (job) name, value: list of file names (builds)
1477             local_builds = {
1478                 local_dir: [join(local_dir, name) for name in filenames]
1479             }
1480
1481         elif dirnames and not filenames:
1482             dirnames.sort()
1483             # local_builds:
1484             # key: dir (job) name, value: list of file names (builds)
1485             local_builds = dict()
1486             for dirname in dirnames:
1487                 builds = [
1488                     join(local_dir, dirname, name)
1489                     for name in listdir(join(local_dir, dirname))
1490                     if isfile(join(local_dir, dirname, name))
1491                 ]
1492                 if builds:
1493                     local_builds[dirname] = sorted(builds)
1494
1495         elif not filenames and not dirnames:
1496             raise PresentationError(f"The directory {local_dir} is empty.")
1497         else:
1498             raise PresentationError(
1499                 f"The directory {local_dir} can include only files or only "
1500                 f"directories, not both.\nThe directory {local_dir} includes "
1501                 f"file(s):\n{filenames}\nand directories:\n{dirnames}"
1502             )
1503
1504         if replace:
1505             self._cfg.input = dict()
1506
1507         for job, files in local_builds.items():
1508             for idx, local_file in enumerate(files):
1509                 self.process_local_file(local_file, job, idx + 1, replace=False)
1510
1511     @staticmethod
1512     def _end_of_tag(tag_filter, start=0, closer="'"):
1513         """Return the index of character in the string which is the end of tag.
1514
1515         :param tag_filter: The string where the end of tag is being searched.
1516         :param start: The index where the searching is stated.
1517         :param closer: The character which is the tag closer.
1518         :type tag_filter: str
1519         :type start: int
1520         :type closer: str
1521         :returns: The index of the tag closer.
1522         :rtype: int
1523         """
1524         try:
1525             idx_opener = tag_filter.index(closer, start)
1526             return tag_filter.index(closer, idx_opener + 1)
1527         except ValueError:
1528             return None
1529
1530     @staticmethod
1531     def _condition(tag_filter):
1532         """Create a conditional statement from the given tag filter.
1533
1534         :param tag_filter: Filter based on tags from the element specification.
1535         :type tag_filter: str
1536         :returns: Conditional statement which can be evaluated.
1537         :rtype: str
1538         """
1539         index = 0
1540         while True:
1541             index = InputData._end_of_tag(tag_filter, index)
1542             if index is None:
1543                 return tag_filter
1544             index += 1
1545             tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
1546
1547     def filter_data(self, element, params=None, data=None, data_set="tests",
1548                     continue_on_error=False):
1549         """Filter required data from the given jobs and builds.
1550
1551         The output data structure is:
1552         - job 1
1553           - build 1
1554             - test (or suite) 1 ID:
1555               - param 1
1556               - param 2
1557               ...
1558               - param n
1559             ...
1560             - test (or suite) n ID:
1561             ...
1562           ...
1563           - build n
1564         ...
1565         - job n
1566
1567         :param element: Element which will use the filtered data.
1568         :param params: Parameters which will be included in the output. If None,
1569             all parameters are included.
1570         :param data: If not None, this data is used instead of data specified
1571             in the element.
1572         :param data_set: The set of data to be filtered: tests, suites,
1573             metadata.
1574         :param continue_on_error: Continue if there is error while reading the
1575             data. The Item will be empty then
1576         :type element: pandas.Series
1577         :type params: list
1578         :type data: dict
1579         :type data_set: str
1580         :type continue_on_error: bool
1581         :returns: Filtered data.
1582         :rtype pandas.Series
1583         """
1584
1585         try:
1586             if data_set == "suites":
1587                 cond = "True"
1588             elif element["filter"] in ("all", "template"):
1589                 cond = "True"
1590             else:
1591                 cond = InputData._condition(element["filter"])
1592             logging.debug(f"   Filter: {cond}")
1593         except KeyError:
1594             logging.error("  No filter defined.")
1595             return None
1596
1597         if params is None:
1598             params = element.get("parameters", None)
1599             if params:
1600                 params.extend(("type", "status"))
1601
1602         data_to_filter = data if data else element["data"]
1603         data = pd.Series(dtype="float64")
1604         try:
1605             for job, builds in data_to_filter.items():
1606                 data[job] = pd.Series(dtype="float64")
1607                 for build in builds:
1608                     data[job][str(build)] = pd.Series(dtype="float64")
1609                     try:
1610                         data_dict = dict(
1611                             self.data[job][str(build)][data_set].items())
1612                     except KeyError:
1613                         if continue_on_error:
1614                             continue
1615                         return None
1616
1617                     for test_id, test_data in data_dict.items():
1618                         if eval(cond, {"tags": test_data.get("tags", "")}):
1619                             data[job][str(build)][test_id] = \
1620                                 pd.Series(dtype="float64")
1621                             if params is None:
1622                                 for param, val in test_data.items():
1623                                     data[job][str(build)][test_id][param] = val
1624                             else:
1625                                 for param in params:
1626                                     try:
1627                                         data[job][str(build)][test_id][param] =\
1628                                             test_data[param]
1629                                     except KeyError:
1630                                         data[job][str(build)][test_id][param] =\
1631                                             "No Data"
1632             return data
1633
1634         except (KeyError, IndexError, ValueError) as err:
1635             logging.error(
1636                 f"Missing mandatory parameter in the element specification: "
1637                 f"{repr(err)}"
1638             )
1639             return None
1640         except AttributeError as err:
1641             logging.error(repr(err))
1642             return None
1643         except SyntaxError as err:
1644             logging.error(
1645                 f"The filter {cond} is not correct. Check if all tags are "
1646                 f"enclosed by apostrophes.\n{repr(err)}"
1647             )
1648             return None
1649
1650     def filter_tests_by_name(self, element, params=None, data_set="tests",
1651                              continue_on_error=False):
1652         """Filter required data from the given jobs and builds.
1653
1654         The output data structure is:
1655         - job 1
1656           - build 1
1657             - test (or suite) 1 ID:
1658               - param 1
1659               - param 2
1660               ...
1661               - param n
1662             ...
1663             - test (or suite) n ID:
1664             ...
1665           ...
1666           - build n
1667         ...
1668         - job n
1669
1670         :param element: Element which will use the filtered data.
1671         :param params: Parameters which will be included in the output. If None,
1672         all parameters are included.
1673         :param data_set: The set of data to be filtered: tests, suites,
1674         metadata.
1675         :param continue_on_error: Continue if there is error while reading the
1676         data. The Item will be empty then
1677         :type element: pandas.Series
1678         :type params: list
1679         :type data_set: str
1680         :type continue_on_error: bool
1681         :returns: Filtered data.
1682         :rtype pandas.Series
1683         """
1684
1685         include = element.get("include", None)
1686         if not include:
1687             logging.warning("No tests to include, skipping the element.")
1688             return None
1689
1690         if params is None:
1691             params = element.get("parameters", None)
1692             if params and "type" not in params:
1693                 params.append("type")
1694
1695         cores = element.get("core", None)
1696         if cores:
1697             tests = list()
1698             for core in cores:
1699                 for test in include:
1700                     tests.append(test.format(core=core))
1701         else:
1702             tests = include
1703
1704         data = pd.Series(dtype="float64")
1705         try:
1706             for job, builds in element["data"].items():
1707                 data[job] = pd.Series(dtype="float64")
1708                 for build in builds:
1709                     data[job][str(build)] = pd.Series(dtype="float64")
1710                     for test in tests:
1711                         try:
1712                             reg_ex = re.compile(str(test).lower())
1713                             for test_id in self.data[job][
1714                                     str(build)][data_set].keys():
1715                                 if re.match(reg_ex, str(test_id).lower()):
1716                                     test_data = self.data[job][
1717                                         str(build)][data_set][test_id]
1718                                     data[job][str(build)][test_id] = \
1719                                         pd.Series(dtype="float64")
1720                                     if params is None:
1721                                         for param, val in test_data.items():
1722                                             data[job][str(build)][test_id]\
1723                                                 [param] = val
1724                                     else:
1725                                         for param in params:
1726                                             try:
1727                                                 data[job][str(build)][
1728                                                     test_id][param] = \
1729                                                     test_data[param]
1730                                             except KeyError:
1731                                                 data[job][str(build)][
1732                                                     test_id][param] = "No Data"
1733                         except KeyError as err:
1734                             if continue_on_error:
1735                                 logging.debug(repr(err))
1736                                 continue
1737                             logging.error(repr(err))
1738                             return None
1739             return data
1740
1741         except (KeyError, IndexError, ValueError) as err:
1742             logging.error(
1743                 f"Missing mandatory parameter in the element "
1744                 f"specification: {repr(err)}"
1745             )
1746             return None
1747         except AttributeError as err:
1748             logging.error(repr(err))
1749             return None
1750
1751     @staticmethod
1752     def merge_data(data):
1753         """Merge data from more jobs and builds to a simple data structure.
1754
1755         The output data structure is:
1756
1757         - test (suite) 1 ID:
1758           - param 1
1759           - param 2
1760           ...
1761           - param n
1762         ...
1763         - test (suite) n ID:
1764         ...
1765
1766         :param data: Data to merge.
1767         :type data: pandas.Series
1768         :returns: Merged data.
1769         :rtype: pandas.Series
1770         """
1771
1772         logging.info("    Merging data ...")
1773
1774         merged_data = pd.Series(dtype="float64")
1775         for builds in data.values:
1776             for item in builds.values:
1777                 for item_id, item_data in item.items():
1778                     merged_data[item_id] = item_data
1779         return merged_data
1780
1781     def print_all_oper_data(self):
1782         """Print all operational data to console.
1783         """
1784
1785         for job in self._input_data.values:
1786             for build in job.values:
1787                 for test_id, test_data in build["tests"].items():
1788                     print(f"{test_id}")
1789                     if test_data.get("show-run", None) is None:
1790                         continue
1791                     for dut_name, data in test_data["show-run"].items():
1792                         if data.get("runtime", None) is None:
1793                             continue
1794                         runtime = loads(data["runtime"])
1795                         try:
1796                             threads_nr = len(runtime[0]["clocks"])
1797                         except (IndexError, KeyError):
1798                             continue
1799                         threads = OrderedDict(
1800                             {idx: list() for idx in range(threads_nr)})
1801                         for item in runtime:
1802                             for idx in range(threads_nr):
1803                                 if item["vectors"][idx] > 0:
1804                                     clocks = item["clocks"][idx] / \
1805                                              item["vectors"][idx]
1806                                 elif item["calls"][idx] > 0:
1807                                     clocks = item["clocks"][idx] / \
1808                                              item["calls"][idx]
1809                                 elif item["suspends"][idx] > 0:
1810                                     clocks = item["clocks"][idx] / \
1811                                              item["suspends"][idx]
1812                                 else:
1813                                     clocks = 0.0
1814
1815                                 if item["calls"][idx] > 0:
1816                                     vectors_call = item["vectors"][idx] / \
1817                                                    item["calls"][idx]
1818                                 else:
1819                                     vectors_call = 0.0
1820
1821                                 if int(item["calls"][idx]) + int(
1822                                         item["vectors"][idx]) + \
1823                                         int(item["suspends"][idx]):
1824                                     threads[idx].append([
1825                                         item["name"],
1826                                         item["calls"][idx],
1827                                         item["vectors"][idx],
1828                                         item["suspends"][idx],
1829                                         clocks,
1830                                         vectors_call
1831                                     ])
1832
1833                         print(f"Host IP: {data.get('host', '')}, "
1834                               f"Socket: {data.get('socket', '')}")
1835                         for thread_nr, thread in threads.items():
1836                             txt_table = prettytable.PrettyTable(
1837                                 (
1838                                     "Name",
1839                                     "Nr of Vectors",
1840                                     "Nr of Packets",
1841                                     "Suspends",
1842                                     "Cycles per Packet",
1843                                     "Average Vector Size"
1844                                 )
1845                             )
1846                             avg = 0.0
1847                             for row in thread:
1848                                 txt_table.add_row(row)
1849                                 avg += row[-1]
1850                             if len(thread) == 0:
1851                                 avg = ""
1852                             else:
1853                                 avg = f", Average Vector Size per Node: " \
1854                                       f"{(avg / len(thread)):.2f}"
1855                             th_name = "main" if thread_nr == 0 \
1856                                 else f"worker_{thread_nr}"
1857                             print(f"{dut_name}, {th_name}{avg}")
1858                             txt_table.float_format = ".2"
1859                             txt_table.align = "r"
1860                             txt_table.align["Name"] = "l"
1861                             print(f"{txt_table.get_string()}\n")