Trending: Add latest changes to the "new" dir
[csit.git] / resources / tools / presentation / input_data_parser.py
1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Data pre-processing
15
16 - extract data from output.xml files generated by Jenkins jobs and store in
17   pandas' Series,
18 - provide access to the data.
19 - filter the data using tags,
20 """
21
22 import multiprocessing
23 import os
24 import re
25 import pandas as pd
26 import logging
27
28 from robot.api import ExecutionResult, ResultVisitor
29 from robot import errors
30 from collections import OrderedDict
31 from string import replace
32 from os import remove
33 from os.path import join
34 from datetime import datetime as dt
35 from datetime import timedelta
36 from json import loads
37 from jumpavg.AvgStdevMetadataFactory import AvgStdevMetadataFactory
38
39 from input_data_files import download_and_unzip_data_file
40 from utils import Worker
41
42
43 # Separator used in file names
44 SEPARATOR = "__"
45
46
47 class ExecutionChecker(ResultVisitor):
48     """Class to traverse through the test suite structure.
49
50     The functionality implemented in this class generates a json structure:
51
52     Performance tests:
53
54     {
55         "metadata": {
56             "generated": "Timestamp",
57             "version": "SUT version",
58             "job": "Jenkins job name",
59             "build": "Information about the build"
60         },
61         "suites": {
62             "Suite long name 1": {
63                 "name": Suite name,
64                 "doc": "Suite 1 documentation",
65                 "parent": "Suite 1 parent",
66                 "level": "Level of the suite in the suite hierarchy"
67             }
68             "Suite long name N": {
69                 "name": Suite name,
70                 "doc": "Suite N documentation",
71                 "parent": "Suite 2 parent",
72                 "level": "Level of the suite in the suite hierarchy"
73             }
74         }
75         "tests": {
76             # NDRPDR tests:
77             "ID": {
78                 "name": "Test name",
79                 "parent": "Name of the parent of the test",
80                 "doc": "Test documentation",
81                 "msg": "Test message",
82                 "vat-history": "DUT1 and DUT2 VAT History",
83                 "show-run": "Show Run",
84                 "tags": ["tag 1", "tag 2", "tag n"],
85                 "type": "NDRPDR",
86                 "status": "PASS" | "FAIL",
87                 "throughput": {
88                     "NDR": {
89                         "LOWER": float,
90                         "UPPER": float
91                     },
92                     "PDR": {
93                         "LOWER": float,
94                         "UPPER": float
95                     }
96                 },
97                 "latency": {
98                     "NDR": {
99                         "direction1": {
100                             "min": float,
101                             "avg": float,
102                             "max": float
103                         },
104                         "direction2": {
105                             "min": float,
106                             "avg": float,
107                             "max": float
108                         }
109                     },
110                     "PDR": {
111                         "direction1": {
112                             "min": float,
113                             "avg": float,
114                             "max": float
115                         },
116                         "direction2": {
117                             "min": float,
118                             "avg": float,
119                             "max": float
120                         }
121                     }
122                 }
123             }
124
125             # TCP tests:
126             "ID": {
127                 "name": "Test name",
128                 "parent": "Name of the parent of the test",
129                 "doc": "Test documentation",
130                 "msg": "Test message",
131                 "tags": ["tag 1", "tag 2", "tag n"],
132                 "type": "TCP",
133                 "status": "PASS" | "FAIL",
134                 "result": int
135             }
136
137             # MRR, BMRR tests:
138             "ID": {
139                 "name": "Test name",
140                 "parent": "Name of the parent of the test",
141                 "doc": "Test documentation",
142                 "msg": "Test message",
143                 "tags": ["tag 1", "tag 2", "tag n"],
144                 "type": "MRR" | "BMRR",
145                 "status": "PASS" | "FAIL",
146                 "result": {
147                     "receive-rate": AvgStdevMetadata,
148                 }
149             }
150
151             # TODO: Remove when definitely no NDRPDRDISC tests are used:
152             # NDRPDRDISC tests:
153             "ID": {
154                 "name": "Test name",
155                 "parent": "Name of the parent of the test",
156                 "doc": "Test documentation",
157                 "msg": "Test message",
158                 "tags": ["tag 1", "tag 2", "tag n"],
159                 "type": "PDR" | "NDR",
160                 "status": "PASS" | "FAIL",
161                 "throughput": {  # Only type: "PDR" | "NDR"
162                     "value": int,
163                     "unit": "pps" | "bps" | "percentage"
164                 },
165                 "latency": {  # Only type: "PDR" | "NDR"
166                     "direction1": {
167                         "100": {
168                             "min": int,
169                             "avg": int,
170                             "max": int
171                         },
172                         "50": {  # Only for NDR
173                             "min": int,
174                             "avg": int,
175                             "max": int
176                         },
177                         "10": {  # Only for NDR
178                             "min": int,
179                             "avg": int,
180                             "max": int
181                         }
182                     },
183                     "direction2": {
184                         "100": {
185                             "min": int,
186                             "avg": int,
187                             "max": int
188                         },
189                         "50": {  # Only for NDR
190                             "min": int,
191                             "avg": int,
192                             "max": int
193                         },
194                         "10": {  # Only for NDR
195                             "min": int,
196                             "avg": int,
197                             "max": int
198                         }
199                     }
200                 },
201                 "lossTolerance": "lossTolerance",  # Only type: "PDR"
202                 "vat-history": "DUT1 and DUT2 VAT History"
203                 "show-run": "Show Run"
204             },
205             "ID" {
206                 # next test
207             }
208         }
209     }
210
211
212     Functional tests:
213
214     {
215         "metadata": {  # Optional
216             "version": "VPP version",
217             "job": "Jenkins job name",
218             "build": "Information about the build"
219         },
220         "suites": {
221             "Suite name 1": {
222                 "doc": "Suite 1 documentation",
223                 "parent": "Suite 1 parent",
224                 "level": "Level of the suite in the suite hierarchy"
225             }
226             "Suite name N": {
227                 "doc": "Suite N documentation",
228                 "parent": "Suite 2 parent",
229                 "level": "Level of the suite in the suite hierarchy"
230             }
231         }
232         "tests": {
233             "ID": {
234                 "name": "Test name",
235                 "parent": "Name of the parent of the test",
236                 "doc": "Test documentation"
237                 "msg": "Test message"
238                 "tags": ["tag 1", "tag 2", "tag n"],
239                 "vat-history": "DUT1 and DUT2 VAT History"
240                 "show-run": "Show Run"
241                 "status": "PASS" | "FAIL"
242             },
243             "ID" {
244                 # next test
245             }
246         }
247     }
248
249     .. note:: ID is the lowercase full path to the test.
250     """
251
252     # TODO: Remove when definitely no NDRPDRDISC tests are used:
253     REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
254
255     REGEX_NDRPDR_RATE = re.compile(r'NDR_LOWER:\s(\d+.\d+).*\n.*\n'
256                                    r'NDR_UPPER:\s(\d+.\d+).*\n'
257                                    r'PDR_LOWER:\s(\d+.\d+).*\n.*\n'
258                                    r'PDR_UPPER:\s(\d+.\d+)')
259
260     # TODO: Remove when definitely no NDRPDRDISC tests are used:
261     REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
262                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
263                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
264                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
265                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
266                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
267                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
268
269     REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
270                                r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
271                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
272
273     REGEX_NDRPDR_LAT = re.compile(r'LATENCY.*\[\'(.*)\', \'(.*)\'\]\s\n.*\n.*\n'
274                                   r'LATENCY.*\[\'(.*)\', \'(.*)\'\]')
275
276     REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
277                                  r'[\D\d]*')
278
279     REGEX_VERSION_VPP = re.compile(r"(return STDOUT Version:\s*|"
280                                    r"VPP Version:\s*)(.*)")
281
282     REGEX_VERSION_DPDK = re.compile(r"(return STDOUT testpmd)([\d\D\n]*)"
283                                     r"(RTE Version: 'DPDK )(.*)(')")
284
285     REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
286
287     REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
288                            r'tx\s(\d*),\srx\s(\d*)')
289
290     REGEX_BMRR = re.compile(r'Maximum Receive Rate trial results'
291                             r' in packets per second: \[(.*)\]')
292
293     REGEX_TC_TAG = re.compile(r'\d+[tT]\d+[cC]')
294
295     REGEX_TC_NAME_OLD = re.compile(r'-\d+[tT]\d+[cC]-')
296
297     REGEX_TC_NAME_NEW = re.compile(r'-\d+[cC]-')
298
299     REGEX_TC_NUMBER = re.compile(r'tc[0-9]{2}-')
300
301     def __init__(self, metadata, mapping, ignore):
302         """Initialisation.
303
304         :param metadata: Key-value pairs to be included in "metadata" part of
305             JSON structure.
306         :param mapping: Mapping of the old names of test cases to the new
307             (actual) one.
308         :param ignore: List of TCs to be ignored.
309         :type metadata: dict
310         :type mapping: dict
311         :type ignore: list
312         """
313
314         # Type of message to parse out from the test messages
315         self._msg_type = None
316
317         # VPP version
318         self._version = None
319
320         # Timestamp
321         self._timestamp = None
322
323         # Testbed. The testbed is identified by TG node IP address.
324         self._testbed = None
325
326         # Mapping of TCs long names
327         self._mapping = mapping
328
329         # Ignore list
330         self._ignore = ignore
331
332         # Number of VAT History messages found:
333         # 0 - no message
334         # 1 - VAT History of DUT1
335         # 2 - VAT History of DUT2
336         self._lookup_kw_nr = 0
337         self._vat_history_lookup_nr = 0
338
339         # Number of Show Running messages found
340         # 0 - no message
341         # 1 - Show run message found
342         self._show_run_lookup_nr = 0
343
344         # Test ID of currently processed test- the lowercase full path to the
345         # test
346         self._test_ID = None
347
348         # The main data structure
349         self._data = {
350             "metadata": OrderedDict(),
351             "suites": OrderedDict(),
352             "tests": OrderedDict()
353         }
354
355         # Save the provided metadata
356         for key, val in metadata.items():
357             self._data["metadata"][key] = val
358
359         # Dictionary defining the methods used to parse different types of
360         # messages
361         self.parse_msg = {
362             "timestamp": self._get_timestamp,
363             "vpp-version": self._get_vpp_version,
364             "dpdk-version": self._get_dpdk_version,
365             "teardown-vat-history": self._get_vat_history,
366             "test-show-runtime": self._get_show_run,
367             "testbed": self._get_testbed
368         }
369
370     @property
371     def data(self):
372         """Getter - Data parsed from the XML file.
373
374         :returns: Data parsed from the XML file.
375         :rtype: dict
376         """
377         return self._data
378
379     def _get_testbed(self, msg):
380         """Called when extraction of testbed IP is required.
381         The testbed is identified by TG node IP address.
382
383         :param msg: Message to process.
384         :type msg: Message
385         :returns: Nothing.
386         """
387
388         if msg.message.count("Arguments:"):
389             message = str(msg.message).replace(' ', '').replace('\n', '').\
390                 replace("'", '"').replace('b"', '"').\
391                 replace("honeycom", "honeycomb")
392             message = loads(message[11:-1])
393             try:
394                 self._testbed = message["TG"]["host"]
395             except (KeyError, ValueError):
396                 pass
397             finally:
398                 self._data["metadata"]["testbed"] = self._testbed
399                 self._msg_type = None
400
401     def _get_vpp_version(self, msg):
402         """Called when extraction of VPP version is required.
403
404         :param msg: Message to process.
405         :type msg: Message
406         :returns: Nothing.
407         """
408
409         if msg.message.count("return STDOUT Version:") or \
410             msg.message.count("VPP Version:"):
411             self._version = str(re.search(self.REGEX_VERSION_VPP, msg.message).
412                                 group(2))
413             self._data["metadata"]["version"] = self._version
414             self._msg_type = None
415
416     def _get_dpdk_version(self, msg):
417         """Called when extraction of DPDK version is required.
418
419         :param msg: Message to process.
420         :type msg: Message
421         :returns: Nothing.
422         """
423
424         if msg.message.count("return STDOUT testpmd"):
425             try:
426                 self._version = str(re.search(
427                     self.REGEX_VERSION_DPDK, msg.message). group(4))
428                 self._data["metadata"]["version"] = self._version
429             except IndexError:
430                 pass
431             finally:
432                 self._msg_type = None
433
434     def _get_timestamp(self, msg):
435         """Called when extraction of timestamp is required.
436
437         :param msg: Message to process.
438         :type msg: Message
439         :returns: Nothing.
440         """
441
442         self._timestamp = msg.timestamp[:14]
443         self._data["metadata"]["generated"] = self._timestamp
444         self._msg_type = None
445
446     def _get_vat_history(self, msg):
447         """Called when extraction of VAT command history is required.
448
449         :param msg: Message to process.
450         :type msg: Message
451         :returns: Nothing.
452         """
453         if msg.message.count("VAT command history:"):
454             self._vat_history_lookup_nr += 1
455             if self._vat_history_lookup_nr == 1:
456                 self._data["tests"][self._test_ID]["vat-history"] = str()
457             else:
458                 self._msg_type = None
459             text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
460                           "VAT command history:", "", msg.message, count=1). \
461                 replace("\n\n", "\n").replace('\n', ' |br| ').\
462                 replace('\r', '').replace('"', "'")
463
464             self._data["tests"][self._test_ID]["vat-history"] += " |br| "
465             self._data["tests"][self._test_ID]["vat-history"] += \
466                 "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
467
468     def _get_show_run(self, msg):
469         """Called when extraction of VPP operational data (output of CLI command
470         Show Runtime) is required.
471
472         :param msg: Message to process.
473         :type msg: Message
474         :returns: Nothing.
475         """
476         if msg.message.count("return STDOUT Thread "):
477             self._show_run_lookup_nr += 1
478             if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
479                 self._data["tests"][self._test_ID]["show-run"] = str()
480             if self._lookup_kw_nr > 1:
481                 self._msg_type = None
482             if self._show_run_lookup_nr == 1:
483                 text = msg.message.replace("vat# ", "").\
484                     replace("return STDOUT ", "").replace("\n\n", "\n").\
485                     replace('\n', ' |br| ').\
486                     replace('\r', '').replace('"', "'")
487                 try:
488                     self._data["tests"][self._test_ID]["show-run"] += " |br| "
489                     self._data["tests"][self._test_ID]["show-run"] += \
490                         "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
491                 except KeyError:
492                     pass
493
494     # TODO: Remove when definitely no NDRPDRDISC tests are used:
495     def _get_latency(self, msg, test_type):
496         """Get the latency data from the test message.
497
498         :param msg: Message to be parsed.
499         :param test_type: Type of the test - NDR or PDR.
500         :type msg: str
501         :type test_type: str
502         :returns: Latencies parsed from the message.
503         :rtype: dict
504         """
505
506         if test_type == "NDR":
507             groups = re.search(self.REGEX_LAT_NDR, msg)
508             groups_range = range(1, 7)
509         elif test_type == "PDR":
510             groups = re.search(self.REGEX_LAT_PDR, msg)
511             groups_range = range(1, 3)
512         else:
513             return {}
514
515         latencies = list()
516         for idx in groups_range:
517             try:
518                 lat = [int(item) for item in str(groups.group(idx)).split('/')]
519             except (AttributeError, ValueError):
520                 lat = [-1, -1, -1]
521             latencies.append(lat)
522
523         keys = ("min", "avg", "max")
524         latency = {
525             "direction1": {
526             },
527             "direction2": {
528             }
529         }
530
531         latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
532         latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
533         if test_type == "NDR":
534             latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
535             latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
536             latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
537             latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
538
539         return latency
540
541     def _get_ndrpdr_throughput(self, msg):
542         """Get NDR_LOWER, NDR_UPPER, PDR_LOWER and PDR_UPPER from the test
543         message.
544
545         :param msg: The test message to be parsed.
546         :type msg: str
547         :returns: Parsed data as a dict and the status (PASS/FAIL).
548         :rtype: tuple(dict, str)
549         """
550
551         throughput = {
552             "NDR": {"LOWER": -1.0, "UPPER": -1.0},
553             "PDR": {"LOWER": -1.0, "UPPER": -1.0}
554         }
555         status = "FAIL"
556         groups = re.search(self.REGEX_NDRPDR_RATE, msg)
557
558         if groups is not None:
559             try:
560                 throughput["NDR"]["LOWER"] = float(groups.group(1))
561                 throughput["NDR"]["UPPER"] = float(groups.group(2))
562                 throughput["PDR"]["LOWER"] = float(groups.group(3))
563                 throughput["PDR"]["UPPER"] = float(groups.group(4))
564                 status = "PASS"
565             except (IndexError, ValueError):
566                 pass
567
568         return throughput, status
569
570     def _get_ndrpdr_latency(self, msg):
571         """Get LATENCY from the test message.
572
573         :param msg: The test message to be parsed.
574         :type msg: str
575         :returns: Parsed data as a dict and the status (PASS/FAIL).
576         :rtype: tuple(dict, str)
577         """
578
579         latency = {
580             "NDR": {
581                 "direction1": {"min": -1.0, "avg": -1.0, "max": -1.0},
582                 "direction2": {"min": -1.0, "avg": -1.0, "max": -1.0}
583             },
584             "PDR": {
585                 "direction1": {"min": -1.0, "avg": -1.0, "max": -1.0},
586                 "direction2": {"min": -1.0, "avg": -1.0, "max": -1.0}
587             }
588         }
589         status = "FAIL"
590         groups = re.search(self.REGEX_NDRPDR_LAT, msg)
591
592         if groups is not None:
593             keys = ("min", "avg", "max")
594             try:
595                 latency["NDR"]["direction1"] = dict(
596                     zip(keys, [float(l) for l in groups.group(1).split('/')]))
597                 latency["NDR"]["direction2"] = dict(
598                     zip(keys, [float(l) for l in groups.group(2).split('/')]))
599                 latency["PDR"]["direction1"] = dict(
600                     zip(keys, [float(l) for l in groups.group(3).split('/')]))
601                 latency["PDR"]["direction2"] = dict(
602                     zip(keys, [float(l) for l in groups.group(4).split('/')]))
603                 status = "PASS"
604             except (IndexError, ValueError):
605                 pass
606
607         return latency, status
608
609     def visit_suite(self, suite):
610         """Implements traversing through the suite and its direct children.
611
612         :param suite: Suite to process.
613         :type suite: Suite
614         :returns: Nothing.
615         """
616         if self.start_suite(suite) is not False:
617             suite.suites.visit(self)
618             suite.tests.visit(self)
619             self.end_suite(suite)
620
621     def start_suite(self, suite):
622         """Called when suite starts.
623
624         :param suite: Suite to process.
625         :type suite: Suite
626         :returns: Nothing.
627         """
628
629         try:
630             parent_name = suite.parent.name
631         except AttributeError:
632             return
633
634         doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
635             replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
636         doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
637
638         self._data["suites"][suite.longname.lower().replace('"', "'").
639             replace(" ", "_")] = {
640                 "name": suite.name.lower(),
641                 "doc": doc_str,
642                 "parent": parent_name,
643                 "level": len(suite.longname.split("."))
644             }
645
646         suite.keywords.visit(self)
647
648     def end_suite(self, suite):
649         """Called when suite ends.
650
651         :param suite: Suite to process.
652         :type suite: Suite
653         :returns: Nothing.
654         """
655         pass
656
657     def visit_test(self, test):
658         """Implements traversing through the test.
659
660         :param test: Test to process.
661         :type test: Test
662         :returns: Nothing.
663         """
664         if self.start_test(test) is not False:
665             test.keywords.visit(self)
666             self.end_test(test)
667
668     def start_test(self, test):
669         """Called when test starts.
670
671         :param test: Test to process.
672         :type test: Test
673         :returns: Nothing.
674         """
675
676         longname_orig = test.longname.lower()
677
678         # Check the ignore list
679         if longname_orig in self._ignore:
680             return
681
682         tags = [str(tag) for tag in test.tags]
683         test_result = dict()
684
685         # Change the TC long name and name if defined in the mapping table
686         longname = self._mapping.get(longname_orig, None)
687         if longname is not None:
688             name = longname.split('.')[-1]
689             logging.debug("{0}\n{1}\n{2}\n{3}".format(
690                 self._data["metadata"], longname_orig, longname, name))
691         else:
692             longname = longname_orig
693             name = test.name.lower()
694
695         # Remove TC number from the TC long name (backward compatibility):
696         self._test_ID = re.sub(self.REGEX_TC_NUMBER, "", longname)
697         # Remove TC number from the TC name (not needed):
698         test_result["name"] = re.sub(self.REGEX_TC_NUMBER, "", name)
699
700         test_result["parent"] = test.parent.name.lower()
701         test_result["tags"] = tags
702         doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
703             replace('\r', '').replace('[', ' |br| [')
704         test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
705         test_result["msg"] = test.message.replace('\n', ' |br| '). \
706             replace('\r', '').replace('"', "'")
707         test_result["type"] = "FUNC"
708         test_result["status"] = test.status
709
710         if "PERFTEST" in tags:
711             # Replace info about cores (e.g. -1c-) with the info about threads
712             # and cores (e.g. -1t1c-) in the long test case names and in the
713             # test case names if necessary.
714             groups = re.search(self.REGEX_TC_NAME_OLD, self._test_ID)
715             if not groups:
716                 tag_count = 0
717                 for tag in test_result["tags"]:
718                     groups = re.search(self.REGEX_TC_TAG, tag)
719                     if groups:
720                         tag_count += 1
721                         tag_tc = tag
722
723                 if tag_count == 1:
724                     self._test_ID = re.sub(self.REGEX_TC_NAME_NEW,
725                                            "-{0}-".format(tag_tc.lower()),
726                                            self._test_ID,
727                                            count=1)
728                     test_result["name"] = re.sub(self.REGEX_TC_NAME_NEW,
729                                                  "-{0}-".format(tag_tc.lower()),
730                                                  test_result["name"],
731                                                  count=1)
732                 else:
733                     test_result["status"] = "FAIL"
734                     self._data["tests"][self._test_ID] = test_result
735                     logging.debug("The test '{0}' has no or more than one "
736                                   "multi-threading tags.".format(self._test_ID))
737                     logging.debug("Tags: {0}".format(test_result["tags"]))
738                     return
739
740         if test.status == "PASS" and ("NDRPDRDISC" in tags or
741                                       "NDRPDR" in tags or
742                                       "TCP" in tags or
743                                       "MRR" in tags or
744                                       "BMRR" in tags):
745             # TODO: Remove when definitely no NDRPDRDISC tests are used:
746             if "NDRDISC" in tags:
747                 test_result["type"] = "NDR"
748             # TODO: Remove when definitely no NDRPDRDISC tests are used:
749             elif "PDRDISC" in tags:
750                 test_result["type"] = "PDR"
751             elif "NDRPDR" in tags:
752                 test_result["type"] = "NDRPDR"
753             elif "TCP" in tags:
754                 test_result["type"] = "TCP"
755             elif "MRR" in tags:
756                 test_result["type"] = "MRR"
757             elif "FRMOBL" in tags or "BMRR" in tags:
758                 test_result["type"] = "BMRR"
759             else:
760                 test_result["status"] = "FAIL"
761                 self._data["tests"][self._test_ID] = test_result
762                 return
763
764             # TODO: Remove when definitely no NDRPDRDISC tests are used:
765             if test_result["type"] in ("NDR", "PDR"):
766                 try:
767                     rate_value = str(re.search(
768                         self.REGEX_RATE, test.message).group(1))
769                 except AttributeError:
770                     rate_value = "-1"
771                 try:
772                     rate_unit = str(re.search(
773                         self.REGEX_RATE, test.message).group(2))
774                 except AttributeError:
775                     rate_unit = "-1"
776
777                 test_result["throughput"] = dict()
778                 test_result["throughput"]["value"] = \
779                     int(rate_value.split('.')[0])
780                 test_result["throughput"]["unit"] = rate_unit
781                 test_result["latency"] = \
782                     self._get_latency(test.message, test_result["type"])
783                 if test_result["type"] == "PDR":
784                     test_result["lossTolerance"] = str(re.search(
785                         self.REGEX_TOLERANCE, test.message).group(1))
786
787             elif test_result["type"] in ("NDRPDR", ):
788                 test_result["throughput"], test_result["status"] = \
789                     self._get_ndrpdr_throughput(test.message)
790                 test_result["latency"], test_result["status"] = \
791                     self._get_ndrpdr_latency(test.message)
792
793             elif test_result["type"] in ("TCP", ):
794                 groups = re.search(self.REGEX_TCP, test.message)
795                 test_result["result"] = int(groups.group(2))
796
797             elif test_result["type"] in ("MRR", "BMRR"):
798                 test_result["result"] = dict()
799                 groups = re.search(self.REGEX_BMRR, test.message)
800                 if groups is not None:
801                     items_str = groups.group(1)
802                     items_float = [float(item.strip()) for item
803                                    in items_str.split(",")]
804                     metadata = AvgStdevMetadataFactory.from_data(items_float)
805                     # Next two lines have been introduced in CSIT-1179,
806                     # to be removed in CSIT-1180.
807                     metadata.size = 1
808                     metadata.stdev = 0.0
809                     test_result["result"]["receive-rate"] = metadata
810                 else:
811                     groups = re.search(self.REGEX_MRR, test.message)
812                     test_result["result"]["receive-rate"] = \
813                         AvgStdevMetadataFactory.from_data([
814                             float(groups.group(3)) / float(groups.group(1)), ])
815
816         self._data["tests"][self._test_ID] = test_result
817
818     def end_test(self, test):
819         """Called when test ends.
820
821         :param test: Test to process.
822         :type test: Test
823         :returns: Nothing.
824         """
825         pass
826
827     def visit_keyword(self, keyword):
828         """Implements traversing through the keyword and its child keywords.
829
830         :param keyword: Keyword to process.
831         :type keyword: Keyword
832         :returns: Nothing.
833         """
834         if self.start_keyword(keyword) is not False:
835             self.end_keyword(keyword)
836
837     def start_keyword(self, keyword):
838         """Called when keyword starts. Default implementation does nothing.
839
840         :param keyword: Keyword to process.
841         :type keyword: Keyword
842         :returns: Nothing.
843         """
844         try:
845             if keyword.type == "setup":
846                 self.visit_setup_kw(keyword)
847             elif keyword.type == "teardown":
848                 self._lookup_kw_nr = 0
849                 self.visit_teardown_kw(keyword)
850             else:
851                 self._lookup_kw_nr = 0
852                 self.visit_test_kw(keyword)
853         except AttributeError:
854             pass
855
856     def end_keyword(self, keyword):
857         """Called when keyword ends. Default implementation does nothing.
858
859         :param keyword: Keyword to process.
860         :type keyword: Keyword
861         :returns: Nothing.
862         """
863         pass
864
865     def visit_test_kw(self, test_kw):
866         """Implements traversing through the test keyword and its child
867         keywords.
868
869         :param test_kw: Keyword to process.
870         :type test_kw: Keyword
871         :returns: Nothing.
872         """
873         for keyword in test_kw.keywords:
874             if self.start_test_kw(keyword) is not False:
875                 self.visit_test_kw(keyword)
876                 self.end_test_kw(keyword)
877
878     def start_test_kw(self, test_kw):
879         """Called when test keyword starts. Default implementation does
880         nothing.
881
882         :param test_kw: Keyword to process.
883         :type test_kw: Keyword
884         :returns: Nothing.
885         """
886         if test_kw.name.count("Show Runtime Counters On All Duts"):
887             self._lookup_kw_nr += 1
888             self._show_run_lookup_nr = 0
889             self._msg_type = "test-show-runtime"
890         elif test_kw.name.count("Start The L2fwd Test") and not self._version:
891             self._msg_type = "dpdk-version"
892         else:
893             return
894         test_kw.messages.visit(self)
895
896     def end_test_kw(self, test_kw):
897         """Called when keyword ends. Default implementation does nothing.
898
899         :param test_kw: Keyword to process.
900         :type test_kw: Keyword
901         :returns: Nothing.
902         """
903         pass
904
905     def visit_setup_kw(self, setup_kw):
906         """Implements traversing through the teardown keyword and its child
907         keywords.
908
909         :param setup_kw: Keyword to process.
910         :type setup_kw: Keyword
911         :returns: Nothing.
912         """
913         for keyword in setup_kw.keywords:
914             if self.start_setup_kw(keyword) is not False:
915                 self.visit_setup_kw(keyword)
916                 self.end_setup_kw(keyword)
917
918     def start_setup_kw(self, setup_kw):
919         """Called when teardown keyword starts. Default implementation does
920         nothing.
921
922         :param setup_kw: Keyword to process.
923         :type setup_kw: Keyword
924         :returns: Nothing.
925         """
926         if setup_kw.name.count("Show Vpp Version On All Duts") \
927                 and not self._version:
928             self._msg_type = "vpp-version"
929
930         elif setup_kw.name.count("Setup performance global Variables") \
931                 and not self._timestamp:
932             self._msg_type = "timestamp"
933         elif setup_kw.name.count("Setup Framework") and not self._testbed:
934             self._msg_type = "testbed"
935         else:
936             return
937         setup_kw.messages.visit(self)
938
939     def end_setup_kw(self, setup_kw):
940         """Called when keyword ends. Default implementation does nothing.
941
942         :param setup_kw: Keyword to process.
943         :type setup_kw: Keyword
944         :returns: Nothing.
945         """
946         pass
947
948     def visit_teardown_kw(self, teardown_kw):
949         """Implements traversing through the teardown keyword and its child
950         keywords.
951
952         :param teardown_kw: Keyword to process.
953         :type teardown_kw: Keyword
954         :returns: Nothing.
955         """
956         for keyword in teardown_kw.keywords:
957             if self.start_teardown_kw(keyword) is not False:
958                 self.visit_teardown_kw(keyword)
959                 self.end_teardown_kw(keyword)
960
961     def start_teardown_kw(self, teardown_kw):
962         """Called when teardown keyword starts. Default implementation does
963         nothing.
964
965         :param teardown_kw: Keyword to process.
966         :type teardown_kw: Keyword
967         :returns: Nothing.
968         """
969
970         if teardown_kw.name.count("Show Vat History On All Duts"):
971             self._vat_history_lookup_nr = 0
972             self._msg_type = "teardown-vat-history"
973             teardown_kw.messages.visit(self)
974
975     def end_teardown_kw(self, teardown_kw):
976         """Called when keyword ends. Default implementation does nothing.
977
978         :param teardown_kw: Keyword to process.
979         :type teardown_kw: Keyword
980         :returns: Nothing.
981         """
982         pass
983
984     def visit_message(self, msg):
985         """Implements visiting the message.
986
987         :param msg: Message to process.
988         :type msg: Message
989         :returns: Nothing.
990         """
991         if self.start_message(msg) is not False:
992             self.end_message(msg)
993
994     def start_message(self, msg):
995         """Called when message starts. Get required information from messages:
996         - VPP version.
997
998         :param msg: Message to process.
999         :type msg: Message
1000         :returns: Nothing.
1001         """
1002
1003         if self._msg_type:
1004             self.parse_msg[self._msg_type](msg)
1005
1006     def end_message(self, msg):
1007         """Called when message ends. Default implementation does nothing.
1008
1009         :param msg: Message to process.
1010         :type msg: Message
1011         :returns: Nothing.
1012         """
1013         pass
1014
1015
1016 class InputData(object):
1017     """Input data
1018
1019     The data is extracted from output.xml files generated by Jenkins jobs and
1020     stored in pandas' DataFrames.
1021
1022     The data structure:
1023     - job name
1024       - build number
1025         - metadata
1026           (as described in ExecutionChecker documentation)
1027         - suites
1028           (as described in ExecutionChecker documentation)
1029         - tests
1030           (as described in ExecutionChecker documentation)
1031     """
1032
1033     def __init__(self, spec):
1034         """Initialization.
1035
1036         :param spec: Specification.
1037         :type spec: Specification
1038         """
1039
1040         # Specification:
1041         self._cfg = spec
1042
1043         # Data store:
1044         self._input_data = pd.Series()
1045
1046     @property
1047     def data(self):
1048         """Getter - Input data.
1049
1050         :returns: Input data
1051         :rtype: pandas.Series
1052         """
1053         return self._input_data
1054
1055     def metadata(self, job, build):
1056         """Getter - metadata
1057
1058         :param job: Job which metadata we want.
1059         :param build: Build which metadata we want.
1060         :type job: str
1061         :type build: str
1062         :returns: Metadata
1063         :rtype: pandas.Series
1064         """
1065
1066         return self.data[job][build]["metadata"]
1067
1068     def suites(self, job, build):
1069         """Getter - suites
1070
1071         :param job: Job which suites we want.
1072         :param build: Build which suites we want.
1073         :type job: str
1074         :type build: str
1075         :returns: Suites.
1076         :rtype: pandas.Series
1077         """
1078
1079         return self.data[job][str(build)]["suites"]
1080
1081     def tests(self, job, build):
1082         """Getter - tests
1083
1084         :param job: Job which tests we want.
1085         :param build: Build which tests we want.
1086         :type job: str
1087         :type build: str
1088         :returns: Tests.
1089         :rtype: pandas.Series
1090         """
1091
1092         return self.data[job][build]["tests"]
1093
1094     def _parse_tests(self, job, build, log):
1095         """Process data from robot output.xml file and return JSON structured
1096         data.
1097
1098         :param job: The name of job which build output data will be processed.
1099         :param build: The build which output data will be processed.
1100         :param log: List of log messages.
1101         :type job: str
1102         :type build: dict
1103         :type log: list of tuples (severity, msg)
1104         :returns: JSON data structure.
1105         :rtype: dict
1106         """
1107
1108         metadata = {
1109             "job": job,
1110             "build": build
1111         }
1112
1113         with open(build["file-name"], 'r') as data_file:
1114             try:
1115                 result = ExecutionResult(data_file)
1116             except errors.DataError as err:
1117                 log.append(("ERROR", "Error occurred while parsing output.xml: "
1118                                      "{0}".format(err)))
1119                 return None
1120         checker = ExecutionChecker(metadata, self._cfg.mapping,
1121                                    self._cfg.ignore)
1122         result.visit(checker)
1123
1124         return checker.data
1125
1126     def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
1127         """Download and parse the input data file.
1128
1129         :param pid: PID of the process executing this method.
1130         :param data_queue: Shared memory between processes. Queue which keeps
1131             the result data. This data is then read by the main process and used
1132             in further processing.
1133         :param job: Name of the Jenkins job which generated the processed input
1134             file.
1135         :param build: Information about the Jenkins build which generated the
1136             processed input file.
1137         :param repeat: Repeat the download specified number of times if not
1138             successful.
1139         :type pid: int
1140         :type data_queue: multiprocessing.Manager().Queue()
1141         :type job: str
1142         :type build: dict
1143         :type repeat: int
1144         """
1145
1146         logs = list()
1147
1148         logging.info("  Processing the job/build: {0}: {1}".
1149                      format(job, build["build"]))
1150
1151         logs.append(("INFO", "  Processing the job/build: {0}: {1}".
1152                      format(job, build["build"])))
1153
1154         state = "failed"
1155         success = False
1156         data = None
1157         do_repeat = repeat
1158         while do_repeat:
1159             success = download_and_unzip_data_file(self._cfg, job, build, pid,
1160                                                    logs)
1161             if success:
1162                 break
1163             do_repeat -= 1
1164         if not success:
1165             logs.append(("ERROR", "It is not possible to download the input "
1166                                   "data file from the job '{job}', build "
1167                                   "'{build}', or it is damaged. Skipped.".
1168                          format(job=job, build=build["build"])))
1169         if success:
1170             logs.append(("INFO", "  Processing data from the build '{0}' ...".
1171                          format(build["build"])))
1172             data = self._parse_tests(job, build, logs)
1173             if data is None:
1174                 logs.append(("ERROR", "Input data file from the job '{job}', "
1175                                       "build '{build}' is damaged. Skipped.".
1176                              format(job=job, build=build["build"])))
1177             else:
1178                 state = "processed"
1179
1180             try:
1181                 remove(build["file-name"])
1182             except OSError as err:
1183                 logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
1184                              format(build["file-name"], repr(err))))
1185
1186         # If the time-period is defined in the specification file, remove all
1187         # files which are outside the time period.
1188         timeperiod = self._cfg.input.get("time-period", None)
1189         if timeperiod and data:
1190             now = dt.utcnow()
1191             timeperiod = timedelta(int(timeperiod))
1192             metadata = data.get("metadata", None)
1193             if metadata:
1194                 generated = metadata.get("generated", None)
1195                 if generated:
1196                     generated = dt.strptime(generated, "%Y%m%d %H:%M")
1197                     if (now - generated) > timeperiod:
1198                         # Remove the data and the file:
1199                         state = "removed"
1200                         data = None
1201                         logs.append(
1202                             ("INFO",
1203                              "    The build {job}/{build} is outdated, will be "
1204                              "removed".format(job=job, build=build["build"])))
1205                         file_name = self._cfg.input["file-name"]
1206                         full_name = join(
1207                             self._cfg.environment["paths"]["DIR[WORKING,DATA]"],
1208                             "{job}{sep}{build}{sep}{name}".
1209                                 format(job=job,
1210                                        sep=SEPARATOR,
1211                                        build=build["build"],
1212                                        name=file_name))
1213                         try:
1214                             remove(full_name)
1215                             logs.append(("INFO",
1216                                          "    The file {name} has been removed".
1217                                          format(name=full_name)))
1218                         except OSError as err:
1219                             logs.append(("ERROR",
1220                                         "Cannot remove the file '{0}': {1}".
1221                                         format(full_name, repr(err))))
1222
1223         logs.append(("INFO", "  Done."))
1224
1225         result = {
1226             "data": data,
1227             "state": state,
1228             "job": job,
1229             "build": build,
1230             "logs": logs
1231         }
1232         data_queue.put(result)
1233
1234     def download_and_parse_data(self, repeat=1):
1235         """Download the input data files, parse input data from input files and
1236         store in pandas' Series.
1237
1238         :param repeat: Repeat the download specified number of times if not
1239             successful.
1240         :type repeat: int
1241         """
1242
1243         logging.info("Downloading and parsing input files ...")
1244
1245         work_queue = multiprocessing.JoinableQueue()
1246         manager = multiprocessing.Manager()
1247         data_queue = manager.Queue()
1248         cpus = multiprocessing.cpu_count()
1249
1250         workers = list()
1251         for cpu in range(cpus):
1252             worker = Worker(work_queue,
1253                             data_queue,
1254                             self._download_and_parse_build)
1255             worker.daemon = True
1256             worker.start()
1257             workers.append(worker)
1258             os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
1259                       format(cpu, worker.pid))
1260
1261         for job, builds in self._cfg.builds.items():
1262             for build in builds:
1263                 work_queue.put((job, build, repeat))
1264
1265         work_queue.join()
1266
1267         logging.info("Done.")
1268
1269         while not data_queue.empty():
1270             result = data_queue.get()
1271
1272             job = result["job"]
1273             build_nr = result["build"]["build"]
1274
1275             if result["data"]:
1276                 data = result["data"]
1277                 build_data = pd.Series({
1278                     "metadata": pd.Series(data["metadata"].values(),
1279                                           index=data["metadata"].keys()),
1280                     "suites": pd.Series(data["suites"].values(),
1281                                         index=data["suites"].keys()),
1282                     "tests": pd.Series(data["tests"].values(),
1283                                        index=data["tests"].keys())})
1284
1285                 if self._input_data.get(job, None) is None:
1286                     self._input_data[job] = pd.Series()
1287                 self._input_data[job][str(build_nr)] = build_data
1288
1289                 self._cfg.set_input_file_name(job, build_nr,
1290                                               result["build"]["file-name"])
1291
1292             self._cfg.set_input_state(job, build_nr, result["state"])
1293
1294             for item in result["logs"]:
1295                 if item[0] == "INFO":
1296                     logging.info(item[1])
1297                 elif item[0] == "ERROR":
1298                     logging.error(item[1])
1299                 elif item[0] == "DEBUG":
1300                     logging.debug(item[1])
1301                 elif item[0] == "CRITICAL":
1302                     logging.critical(item[1])
1303                 elif item[0] == "WARNING":
1304                     logging.warning(item[1])
1305
1306         del data_queue
1307
1308         # Terminate all workers
1309         for worker in workers:
1310             worker.terminate()
1311             worker.join()
1312
1313         logging.info("Done.")
1314
1315     @staticmethod
1316     def _end_of_tag(tag_filter, start=0, closer="'"):
1317         """Return the index of character in the string which is the end of tag.
1318
1319         :param tag_filter: The string where the end of tag is being searched.
1320         :param start: The index where the searching is stated.
1321         :param closer: The character which is the tag closer.
1322         :type tag_filter: str
1323         :type start: int
1324         :type closer: str
1325         :returns: The index of the tag closer.
1326         :rtype: int
1327         """
1328
1329         try:
1330             idx_opener = tag_filter.index(closer, start)
1331             return tag_filter.index(closer, idx_opener + 1)
1332         except ValueError:
1333             return None
1334
1335     @staticmethod
1336     def _condition(tag_filter):
1337         """Create a conditional statement from the given tag filter.
1338
1339         :param tag_filter: Filter based on tags from the element specification.
1340         :type tag_filter: str
1341         :returns: Conditional statement which can be evaluated.
1342         :rtype: str
1343         """
1344
1345         index = 0
1346         while True:
1347             index = InputData._end_of_tag(tag_filter, index)
1348             if index is None:
1349                 return tag_filter
1350             index += 1
1351             tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
1352
1353     def filter_data(self, element, params=None, data_set="tests",
1354                     continue_on_error=False):
1355         """Filter required data from the given jobs and builds.
1356
1357         The output data structure is:
1358
1359         - job 1
1360           - build 1
1361             - test (or suite) 1 ID:
1362               - param 1
1363               - param 2
1364               ...
1365               - param n
1366             ...
1367             - test (or suite) n ID:
1368             ...
1369           ...
1370           - build n
1371         ...
1372         - job n
1373
1374         :param element: Element which will use the filtered data.
1375         :param params: Parameters which will be included in the output. If None,
1376         all parameters are included.
1377         :param data_set: The set of data to be filtered: tests, suites,
1378         metadata.
1379         :param continue_on_error: Continue if there is error while reading the
1380         data. The Item will be empty then
1381         :type element: pandas.Series
1382         :type params: list
1383         :type data_set: str
1384         :type continue_on_error: bool
1385         :returns: Filtered data.
1386         :rtype pandas.Series
1387         """
1388
1389         try:
1390             if element["filter"] in ("all", "template"):
1391                 cond = "True"
1392             else:
1393                 cond = InputData._condition(element["filter"])
1394             logging.debug("   Filter: {0}".format(cond))
1395         except KeyError:
1396             logging.error("  No filter defined.")
1397             return None
1398
1399         if params is None:
1400             params = element.get("parameters", None)
1401             if params:
1402                 params.append("type")
1403
1404         data = pd.Series()
1405         try:
1406             for job, builds in element["data"].items():
1407                 data[job] = pd.Series()
1408                 for build in builds:
1409                     data[job][str(build)] = pd.Series()
1410                     try:
1411                         data_iter = self.data[job][str(build)][data_set].\
1412                             iteritems()
1413                     except KeyError:
1414                         if continue_on_error:
1415                             continue
1416                         else:
1417                             return None
1418                     for test_ID, test_data in data_iter:
1419                         if eval(cond, {"tags": test_data.get("tags", "")}):
1420                             data[job][str(build)][test_ID] = pd.Series()
1421                             if params is None:
1422                                 for param, val in test_data.items():
1423                                     data[job][str(build)][test_ID][param] = val
1424                             else:
1425                                 for param in params:
1426                                     try:
1427                                         data[job][str(build)][test_ID][param] =\
1428                                             test_data[param]
1429                                     except KeyError:
1430                                         data[job][str(build)][test_ID][param] =\
1431                                             "No Data"
1432             return data
1433
1434         except (KeyError, IndexError, ValueError) as err:
1435             logging.error("   Missing mandatory parameter in the element "
1436                           "specification: {0}".format(err))
1437             return None
1438         except AttributeError:
1439             return None
1440         except SyntaxError:
1441             logging.error("   The filter '{0}' is not correct. Check if all "
1442                           "tags are enclosed by apostrophes.".format(cond))
1443             return None
1444
1445     @staticmethod
1446     def merge_data(data):
1447         """Merge data from more jobs and builds to a simple data structure.
1448
1449         The output data structure is:
1450
1451         - test (suite) 1 ID:
1452           - param 1
1453           - param 2
1454           ...
1455           - param n
1456         ...
1457         - test (suite) n ID:
1458         ...
1459
1460         :param data: Data to merge.
1461         :type data: pandas.Series
1462         :returns: Merged data.
1463         :rtype: pandas.Series
1464         """
1465
1466         logging.info("    Merging data ...")
1467
1468         merged_data = pd.Series()
1469         for _, builds in data.iteritems():
1470             for _, item in builds.iteritems():
1471                 for ID, item_data in item.iteritems():
1472                     merged_data[ID] = item_data
1473
1474         return merged_data