FIX: PAL: input data processing
[csit.git] / resources / tools / presentation / input_data_parser.py
1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Data pre-processing
15
16 - extract data from output.xml files generated by Jenkins jobs and store in
17   pandas' Series,
18 - provide access to the data.
19 - filter the data using tags,
20 """
21
22 import multiprocessing
23 import os
24 import re
25 import pandas as pd
26 import logging
27
28 from robot.api import ExecutionResult, ResultVisitor
29 from robot import errors
30 from collections import OrderedDict
31 from string import replace
32 from os import remove
33 from jumpavg.AvgStdevMetadataFactory import AvgStdevMetadataFactory
34
35 from input_data_files import download_and_unzip_data_file
36 from utils import Worker
37
38
39 class ExecutionChecker(ResultVisitor):
40     """Class to traverse through the test suite structure.
41
42     The functionality implemented in this class generates a json structure:
43
44     Performance tests:
45
46     {
47         "metadata": {
48             "generated": "Timestamp",
49             "version": "SUT version",
50             "job": "Jenkins job name",
51             "build": "Information about the build"
52         },
53         "suites": {
54             "Suite long name 1": {
55                 "name": Suite name,
56                 "doc": "Suite 1 documentation",
57                 "parent": "Suite 1 parent",
58                 "level": "Level of the suite in the suite hierarchy"
59             }
60             "Suite long name N": {
61                 "name": Suite name,
62                 "doc": "Suite N documentation",
63                 "parent": "Suite 2 parent",
64                 "level": "Level of the suite in the suite hierarchy"
65             }
66         }
67         "tests": {
68             "ID": {
69                 "name": "Test name",
70                 "parent": "Name of the parent of the test",
71                 "doc": "Test documentation"
72                 "msg": "Test message"
73                 "tags": ["tag 1", "tag 2", "tag n"],
74                 "type": "PDR" | "NDR" | "TCP" | "MRR" | "BMRR",
75                 "throughput": {  # Only type: "PDR" | "NDR"
76                     "value": int,
77                     "unit": "pps" | "bps" | "percentage"
78                 },
79                 "latency": {  # Only type: "PDR" | "NDR"
80                     "direction1": {
81                         "100": {
82                             "min": int,
83                             "avg": int,
84                             "max": int
85                         },
86                         "50": {  # Only for NDR
87                             "min": int,
88                             "avg": int,
89                             "max": int
90                         },
91                         "10": {  # Only for NDR
92                             "min": int,
93                             "avg": int,
94                             "max": int
95                         }
96                     },
97                     "direction2": {
98                         "100": {
99                             "min": int,
100                             "avg": int,
101                             "max": int
102                         },
103                         "50": {  # Only for NDR
104                             "min": int,
105                             "avg": int,
106                             "max": int
107                         },
108                         "10": {  # Only for NDR
109                             "min": int,
110                             "avg": int,
111                             "max": int
112                         }
113                     }
114                 },
115                 "result": {  # Only type: "TCP"
116                     "value": int,
117                     "unit": "cps" | "rps"
118                 },
119                 "result": {  # Only type: "MRR" | "BMRR"
120                     "receive-rate": AvgStdevMetadata,
121                 },
122                 "lossTolerance": "lossTolerance",  # Only type: "PDR"
123                 "vat-history": "DUT1 and DUT2 VAT History"
124                 "show-run": "Show Run"
125             },
126             "ID" {
127                 # next test
128             }
129         }
130     }
131
132
133     Functional tests:
134
135     {
136         "metadata": {  # Optional
137             "version": "VPP version",
138             "job": "Jenkins job name",
139             "build": "Information about the build"
140         },
141         "suites": {
142             "Suite name 1": {
143                 "doc": "Suite 1 documentation",
144                 "parent": "Suite 1 parent",
145                 "level": "Level of the suite in the suite hierarchy"
146             }
147             "Suite name N": {
148                 "doc": "Suite N documentation",
149                 "parent": "Suite 2 parent",
150                 "level": "Level of the suite in the suite hierarchy"
151             }
152         }
153         "tests": {
154             "ID": {
155                 "name": "Test name",
156                 "parent": "Name of the parent of the test",
157                 "doc": "Test documentation"
158                 "msg": "Test message"
159                 "tags": ["tag 1", "tag 2", "tag n"],
160                 "vat-history": "DUT1 and DUT2 VAT History"
161                 "show-run": "Show Run"
162                 "status": "PASS" | "FAIL"
163             },
164             "ID" {
165                 # next test
166             }
167         }
168     }
169
170     .. note:: ID is the lowercase full path to the test.
171     """
172
173     REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
174
175     REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
176                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
177                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
178                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
179                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
180                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
181                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
182
183     REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
184                                r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
185                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
186
187     REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
188                                  r'[\D\d]*')
189
190     REGEX_VERSION_VPP = re.compile(r"(return STDOUT Version:\s*)(.*)")
191
192     REGEX_VERSION_DPDK = re.compile(r"(return STDOUT testpmd)([\d\D\n]*)"
193                                     r"(RTE Version: 'DPDK )(.*)(')")
194
195     REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
196
197     REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
198                            r'tx\s(\d*),\srx\s(\d*)')
199
200     REGEX_BMRR = re.compile(r'Maximum Receive Rate trial results'
201                             r' in packets per second: \[(.*)\]')
202
203     REGEX_TC_TAG = re.compile(r'\d+[tT]\d+[cC]')
204
205     REGEX_TC_NAME_OLD = re.compile(r'-\d+[tT]\d+[cC]-')
206
207     REGEX_TC_NAME_NEW = re.compile(r'-\d+[cC]-')
208
209     def __init__(self, metadata):
210         """Initialisation.
211
212         :param metadata: Key-value pairs to be included in "metadata" part of
213         JSON structure.
214         :type metadata: dict
215         """
216
217         # Type of message to parse out from the test messages
218         self._msg_type = None
219
220         # VPP version
221         self._version = None
222
223         # Timestamp
224         self._timestamp = None
225
226         # Number of VAT History messages found:
227         # 0 - no message
228         # 1 - VAT History of DUT1
229         # 2 - VAT History of DUT2
230         self._lookup_kw_nr = 0
231         self._vat_history_lookup_nr = 0
232
233         # Number of Show Running messages found
234         # 0 - no message
235         # 1 - Show run message found
236         self._show_run_lookup_nr = 0
237
238         # Test ID of currently processed test- the lowercase full path to the
239         # test
240         self._test_ID = None
241
242         # The main data structure
243         self._data = {
244             "metadata": OrderedDict(),
245             "suites": OrderedDict(),
246             "tests": OrderedDict()
247         }
248
249         # Save the provided metadata
250         for key, val in metadata.items():
251             self._data["metadata"][key] = val
252
253         # Dictionary defining the methods used to parse different types of
254         # messages
255         self.parse_msg = {
256             "timestamp": self._get_timestamp,
257             "vpp-version": self._get_vpp_version,
258             "dpdk-version": self._get_dpdk_version,
259             "teardown-vat-history": self._get_vat_history,
260             "test-show-runtime": self._get_show_run
261         }
262
263     @property
264     def data(self):
265         """Getter - Data parsed from the XML file.
266
267         :returns: Data parsed from the XML file.
268         :rtype: dict
269         """
270         return self._data
271
272     def _get_vpp_version(self, msg):
273         """Called when extraction of VPP version is required.
274
275         :param msg: Message to process.
276         :type msg: Message
277         :returns: Nothing.
278         """
279
280         if msg.message.count("return STDOUT Version:"):
281             self._version = str(re.search(self.REGEX_VERSION_VPP, msg.message).
282                                 group(2))
283             self._data["metadata"]["version"] = self._version
284             self._msg_type = None
285
286     def _get_dpdk_version(self, msg):
287         """Called when extraction of DPDK version is required.
288
289         :param msg: Message to process.
290         :type msg: Message
291         :returns: Nothing.
292         """
293
294         if msg.message.count("return STDOUT testpmd"):
295             try:
296                 self._version = str(re.search(
297                     self.REGEX_VERSION_DPDK, msg.message). group(4))
298                 self._data["metadata"]["version"] = self._version
299             except IndexError:
300                 pass
301             finally:
302                 self._msg_type = None
303
304     def _get_timestamp(self, msg):
305         """Called when extraction of timestamp is required.
306
307         :param msg: Message to process.
308         :type msg: Message
309         :returns: Nothing.
310         """
311
312         self._timestamp = msg.timestamp[:14]
313         self._data["metadata"]["generated"] = self._timestamp
314         self._msg_type = None
315
316     def _get_vat_history(self, msg):
317         """Called when extraction of VAT command history is required.
318
319         :param msg: Message to process.
320         :type msg: Message
321         :returns: Nothing.
322         """
323         if msg.message.count("VAT command history:"):
324             self._vat_history_lookup_nr += 1
325             if self._vat_history_lookup_nr == 1:
326                 self._data["tests"][self._test_ID]["vat-history"] = str()
327             else:
328                 self._msg_type = None
329             text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
330                           "VAT command history:", "", msg.message, count=1). \
331                 replace("\n\n", "\n").replace('\n', ' |br| ').\
332                 replace('\r', '').replace('"', "'")
333
334             self._data["tests"][self._test_ID]["vat-history"] += " |br| "
335             self._data["tests"][self._test_ID]["vat-history"] += \
336                 "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
337
338     def _get_show_run(self, msg):
339         """Called when extraction of VPP operational data (output of CLI command
340         Show Runtime) is required.
341
342         :param msg: Message to process.
343         :type msg: Message
344         :returns: Nothing.
345         """
346         if msg.message.count("return STDOUT Thread "):
347             self._show_run_lookup_nr += 1
348             if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
349                 self._data["tests"][self._test_ID]["show-run"] = str()
350             if self._lookup_kw_nr > 1:
351                 self._msg_type = None
352             if self._show_run_lookup_nr == 1:
353                 text = msg.message.replace("vat# ", "").\
354                     replace("return STDOUT ", "").replace("\n\n", "\n").\
355                     replace('\n', ' |br| ').\
356                     replace('\r', '').replace('"', "'")
357                 try:
358                     self._data["tests"][self._test_ID]["show-run"] += " |br| "
359                     self._data["tests"][self._test_ID]["show-run"] += \
360                         "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
361                 except KeyError:
362                     pass
363
364     def _get_latency(self, msg, test_type):
365         """Get the latency data from the test message.
366
367         :param msg: Message to be parsed.
368         :param test_type: Type of the test - NDR or PDR.
369         :type msg: str
370         :type test_type: str
371         :returns: Latencies parsed from the message.
372         :rtype: dict
373         """
374
375         if test_type == "NDR":
376             groups = re.search(self.REGEX_LAT_NDR, msg)
377             groups_range = range(1, 7)
378         elif test_type == "PDR":
379             groups = re.search(self.REGEX_LAT_PDR, msg)
380             groups_range = range(1, 3)
381         else:
382             return {}
383
384         latencies = list()
385         for idx in groups_range:
386             try:
387                 lat = [int(item) for item in str(groups.group(idx)).split('/')]
388             except (AttributeError, ValueError):
389                 lat = [-1, -1, -1]
390             latencies.append(lat)
391
392         keys = ("min", "avg", "max")
393         latency = {
394             "direction1": {
395             },
396             "direction2": {
397             }
398         }
399
400         latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
401         latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
402         if test_type == "NDR":
403             latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
404             latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
405             latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
406             latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
407
408         return latency
409
410     def visit_suite(self, suite):
411         """Implements traversing through the suite and its direct children.
412
413         :param suite: Suite to process.
414         :type suite: Suite
415         :returns: Nothing.
416         """
417         if self.start_suite(suite) is not False:
418             suite.suites.visit(self)
419             suite.tests.visit(self)
420             self.end_suite(suite)
421
422     def start_suite(self, suite):
423         """Called when suite starts.
424
425         :param suite: Suite to process.
426         :type suite: Suite
427         :returns: Nothing.
428         """
429
430         try:
431             parent_name = suite.parent.name
432         except AttributeError:
433             return
434
435         doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
436             replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
437         doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
438
439         self._data["suites"][suite.longname.lower().replace('"', "'").
440             replace(" ", "_")] = {
441                 "name": suite.name.lower(),
442                 "doc": doc_str,
443                 "parent": parent_name,
444                 "level": len(suite.longname.split("."))
445             }
446
447         suite.keywords.visit(self)
448
449     def end_suite(self, suite):
450         """Called when suite ends.
451
452         :param suite: Suite to process.
453         :type suite: Suite
454         :returns: Nothing.
455         """
456         pass
457
458     def visit_test(self, test):
459         """Implements traversing through the test.
460
461         :param test: Test to process.
462         :type test: Test
463         :returns: Nothing.
464         """
465         if self.start_test(test) is not False:
466             test.keywords.visit(self)
467             self.end_test(test)
468
469     def start_test(self, test):
470         """Called when test starts.
471
472         :param test: Test to process.
473         :type test: Test
474         :returns: Nothing.
475         """
476
477         tags = [str(tag) for tag in test.tags]
478         test_result = dict()
479         test_result["name"] = test.name.lower()
480         test_result["parent"] = test.parent.name.lower()
481         test_result["tags"] = tags
482         doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
483             replace('\r', '').replace('[', ' |br| [')
484         test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
485         test_result["msg"] = test.message.replace('\n', ' |br| '). \
486             replace('\r', '').replace('"', "'")
487         test_result["status"] = test.status
488         self._test_ID = test.longname.lower()
489         if test.status == "PASS" and ("NDRPDRDISC" in tags or
490                                       "TCP" in tags or
491                                       "MRR" in tags or
492                                       "BMRR" in tags):
493             if "NDRDISC" in tags:
494                 test_type = "NDR"
495             elif "PDRDISC" in tags:
496                 test_type = "PDR"
497             elif "TCP" in tags:
498                 test_type = "TCP"
499             elif "MRR" in tags:
500                 test_type = "MRR"
501             elif "FRMOBL" in tags or "BMRR" in tags:
502                 test_type = "BMRR"
503             else:
504                 test_result["status"] = "FAIL"
505                 self._data["tests"][self._test_ID] = test_result
506                 return
507
508             test_result["type"] = test_type
509
510             # Replace info about cores (e.g. -1c-) with the info about threads
511             # and cores (e.g. -1t1c-) in the long test case names and in the
512             # test case names if necessary.
513             groups = re.search(self.REGEX_TC_NAME_OLD, self._test_ID)
514             if not groups:
515                 tag_count = 0
516                 for tag in test_result["tags"]:
517                     groups = re.search(self.REGEX_TC_TAG, tag)
518                     if groups:
519                         tag_count += 1
520                         tag_tc = tag
521
522                 if tag_count == 1:
523                     self._test_ID = re.sub(self.REGEX_TC_NAME_NEW,
524                                            "-{0}-".format(tag_tc.lower()),
525                                            self._test_ID,
526                                            count=1)
527                     test_result["name"] = re.sub(self.REGEX_TC_NAME_NEW,
528                                                  "-{0}-".format(tag_tc.lower()),
529                                                  test_result["name"],
530                                                  count=1)
531                 else:
532                     test_result["status"] = "FAIL"
533                     self._data["tests"][self._test_ID] = test_result
534                     logging.error("The test '{0}' has no or more than one "
535                                   "multi-threading tags.".format(self._test_ID))
536                     return
537
538             if test_type in ("NDR", "PDR"):
539                 try:
540                     rate_value = str(re.search(
541                         self.REGEX_RATE, test.message).group(1))
542                 except AttributeError:
543                     rate_value = "-1"
544                 try:
545                     rate_unit = str(re.search(
546                         self.REGEX_RATE, test.message).group(2))
547                 except AttributeError:
548                     rate_unit = "-1"
549
550                 test_result["throughput"] = dict()
551                 test_result["throughput"]["value"] = \
552                     int(rate_value.split('.')[0])
553                 test_result["throughput"]["unit"] = rate_unit
554                 test_result["latency"] = \
555                     self._get_latency(test.message, test_type)
556                 if test_type == "PDR":
557                     test_result["lossTolerance"] = str(re.search(
558                         self.REGEX_TOLERANCE, test.message).group(1))
559
560             elif test_type in ("TCP", ):
561                 groups = re.search(self.REGEX_TCP, test.message)
562                 test_result["result"] = dict()
563                 test_result["result"]["value"] = int(groups.group(2))
564                 test_result["result"]["unit"] = groups.group(1)
565
566             elif test_type in ("MRR", "BMRR"):
567                 test_result["result"] = dict()
568                 groups = re.search(self.REGEX_BMRR, test.message)
569                 if groups is not None:
570                     items_str = groups.group(1)
571                     items_float = [float(item.strip()) for item
572                                    in items_str.split(",")]
573                     test_result["result"]["receive-rate"] = \
574                         AvgStdevMetadataFactory.from_data(items_float)
575                 else:
576                     groups = re.search(self.REGEX_MRR, test.message)
577                     test_result["result"]["receive-rate"] = \
578                         AvgStdevMetadataFactory.from_data([
579                             float(groups.group(3)) / float(groups.group(1)), ])
580
581         self._data["tests"][self._test_ID] = test_result
582
583     def end_test(self, test):
584         """Called when test ends.
585
586         :param test: Test to process.
587         :type test: Test
588         :returns: Nothing.
589         """
590         pass
591
592     def visit_keyword(self, keyword):
593         """Implements traversing through the keyword and its child keywords.
594
595         :param keyword: Keyword to process.
596         :type keyword: Keyword
597         :returns: Nothing.
598         """
599         if self.start_keyword(keyword) is not False:
600             self.end_keyword(keyword)
601
602     def start_keyword(self, keyword):
603         """Called when keyword starts. Default implementation does nothing.
604
605         :param keyword: Keyword to process.
606         :type keyword: Keyword
607         :returns: Nothing.
608         """
609         try:
610             if keyword.type == "setup":
611                 self.visit_setup_kw(keyword)
612             elif keyword.type == "teardown":
613                 self._lookup_kw_nr = 0
614                 self.visit_teardown_kw(keyword)
615             else:
616                 self._lookup_kw_nr = 0
617                 self.visit_test_kw(keyword)
618         except AttributeError:
619             pass
620
621     def end_keyword(self, keyword):
622         """Called when keyword ends. Default implementation does nothing.
623
624         :param keyword: Keyword to process.
625         :type keyword: Keyword
626         :returns: Nothing.
627         """
628         pass
629
630     def visit_test_kw(self, test_kw):
631         """Implements traversing through the test keyword and its child
632         keywords.
633
634         :param test_kw: Keyword to process.
635         :type test_kw: Keyword
636         :returns: Nothing.
637         """
638         for keyword in test_kw.keywords:
639             if self.start_test_kw(keyword) is not False:
640                 self.visit_test_kw(keyword)
641                 self.end_test_kw(keyword)
642
643     def start_test_kw(self, test_kw):
644         """Called when test keyword starts. Default implementation does
645         nothing.
646
647         :param test_kw: Keyword to process.
648         :type test_kw: Keyword
649         :returns: Nothing.
650         """
651         if test_kw.name.count("Show Runtime Counters On All Duts"):
652             self._lookup_kw_nr += 1
653             self._show_run_lookup_nr = 0
654             self._msg_type = "test-show-runtime"
655         elif test_kw.name.count("Start The L2fwd Test") and not self._version:
656             self._msg_type = "dpdk-version"
657         else:
658             return
659         test_kw.messages.visit(self)
660
661     def end_test_kw(self, test_kw):
662         """Called when keyword ends. Default implementation does nothing.
663
664         :param test_kw: Keyword to process.
665         :type test_kw: Keyword
666         :returns: Nothing.
667         """
668         pass
669
670     def visit_setup_kw(self, setup_kw):
671         """Implements traversing through the teardown keyword and its child
672         keywords.
673
674         :param setup_kw: Keyword to process.
675         :type setup_kw: Keyword
676         :returns: Nothing.
677         """
678         for keyword in setup_kw.keywords:
679             if self.start_setup_kw(keyword) is not False:
680                 self.visit_setup_kw(keyword)
681                 self.end_setup_kw(keyword)
682
683     def start_setup_kw(self, setup_kw):
684         """Called when teardown keyword starts. Default implementation does
685         nothing.
686
687         :param setup_kw: Keyword to process.
688         :type setup_kw: Keyword
689         :returns: Nothing.
690         """
691         if setup_kw.name.count("Show Vpp Version On All Duts") \
692                 and not self._version:
693             self._msg_type = "vpp-version"
694
695         elif setup_kw.name.count("Setup performance global Variables") \
696                 and not self._timestamp:
697             self._msg_type = "timestamp"
698         else:
699             return
700         setup_kw.messages.visit(self)
701
702     def end_setup_kw(self, setup_kw):
703         """Called when keyword ends. Default implementation does nothing.
704
705         :param setup_kw: Keyword to process.
706         :type setup_kw: Keyword
707         :returns: Nothing.
708         """
709         pass
710
711     def visit_teardown_kw(self, teardown_kw):
712         """Implements traversing through the teardown keyword and its child
713         keywords.
714
715         :param teardown_kw: Keyword to process.
716         :type teardown_kw: Keyword
717         :returns: Nothing.
718         """
719         for keyword in teardown_kw.keywords:
720             if self.start_teardown_kw(keyword) is not False:
721                 self.visit_teardown_kw(keyword)
722                 self.end_teardown_kw(keyword)
723
724     def start_teardown_kw(self, teardown_kw):
725         """Called when teardown keyword starts. Default implementation does
726         nothing.
727
728         :param teardown_kw: Keyword to process.
729         :type teardown_kw: Keyword
730         :returns: Nothing.
731         """
732
733         if teardown_kw.name.count("Show Vat History On All Duts"):
734             self._vat_history_lookup_nr = 0
735             self._msg_type = "teardown-vat-history"
736             teardown_kw.messages.visit(self)
737
738     def end_teardown_kw(self, teardown_kw):
739         """Called when keyword ends. Default implementation does nothing.
740
741         :param teardown_kw: Keyword to process.
742         :type teardown_kw: Keyword
743         :returns: Nothing.
744         """
745         pass
746
747     def visit_message(self, msg):
748         """Implements visiting the message.
749
750         :param msg: Message to process.
751         :type msg: Message
752         :returns: Nothing.
753         """
754         if self.start_message(msg) is not False:
755             self.end_message(msg)
756
757     def start_message(self, msg):
758         """Called when message starts. Get required information from messages:
759         - VPP version.
760
761         :param msg: Message to process.
762         :type msg: Message
763         :returns: Nothing.
764         """
765
766         if self._msg_type:
767             self.parse_msg[self._msg_type](msg)
768
769     def end_message(self, msg):
770         """Called when message ends. Default implementation does nothing.
771
772         :param msg: Message to process.
773         :type msg: Message
774         :returns: Nothing.
775         """
776         pass
777
778
779 class InputData(object):
780     """Input data
781
782     The data is extracted from output.xml files generated by Jenkins jobs and
783     stored in pandas' DataFrames.
784
785     The data structure:
786     - job name
787       - build number
788         - metadata
789           (as described in ExecutionChecker documentation)
790         - suites
791           (as described in ExecutionChecker documentation)
792         - tests
793           (as described in ExecutionChecker documentation)
794     """
795
796     def __init__(self, spec):
797         """Initialization.
798
799         :param spec: Specification.
800         :type spec: Specification
801         """
802
803         # Specification:
804         self._cfg = spec
805
806         # Data store:
807         self._input_data = pd.Series()
808
809     @property
810     def data(self):
811         """Getter - Input data.
812
813         :returns: Input data
814         :rtype: pandas.Series
815         """
816         return self._input_data
817
818     def metadata(self, job, build):
819         """Getter - metadata
820
821         :param job: Job which metadata we want.
822         :param build: Build which metadata we want.
823         :type job: str
824         :type build: str
825         :returns: Metadata
826         :rtype: pandas.Series
827         """
828
829         return self.data[job][build]["metadata"]
830
831     def suites(self, job, build):
832         """Getter - suites
833
834         :param job: Job which suites we want.
835         :param build: Build which suites we want.
836         :type job: str
837         :type build: str
838         :returns: Suites.
839         :rtype: pandas.Series
840         """
841
842         return self.data[job][str(build)]["suites"]
843
844     def tests(self, job, build):
845         """Getter - tests
846
847         :param job: Job which tests we want.
848         :param build: Build which tests we want.
849         :type job: str
850         :type build: str
851         :returns: Tests.
852         :rtype: pandas.Series
853         """
854
855         return self.data[job][build]["tests"]
856
857     @staticmethod
858     def _parse_tests(job, build, log):
859         """Process data from robot output.xml file and return JSON structured
860         data.
861
862         :param job: The name of job which build output data will be processed.
863         :param build: The build which output data will be processed.
864         :param log: List of log messages.
865         :type job: str
866         :type build: dict
867         :type log: list of tuples (severity, msg)
868         :returns: JSON data structure.
869         :rtype: dict
870         """
871
872         metadata = {
873             "job": job,
874             "build": build
875         }
876
877         with open(build["file-name"], 'r') as data_file:
878             try:
879                 result = ExecutionResult(data_file)
880             except errors.DataError as err:
881                 log.append(("ERROR", "Error occurred while parsing output.xml: "
882                                      "{0}".format(err)))
883                 return None
884         checker = ExecutionChecker(metadata)
885         result.visit(checker)
886
887         return checker.data
888
889     def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
890         """Download and parse the input data file.
891
892         :param pid: PID of the process executing this method.
893         :param data_queue: Shared memory between processes. Queue which keeps
894             the result data. This data is then read by the main process and used
895             in further processing.
896         :param job: Name of the Jenkins job which generated the processed input
897             file.
898         :param build: Information about the Jenkins build which generated the
899             processed input file.
900         :param repeat: Repeat the download specified number of times if not
901             successful.
902         :type pid: int
903         :type data_queue: multiprocessing.Manager().Queue()
904         :type job: str
905         :type build: dict
906         :type repeat: int
907         """
908
909         logs = list()
910
911         logging.info("  Processing the job/build: {0}: {1}".
912                      format(job, build["build"]))
913
914         logs.append(("INFO", "  Processing the job/build: {0}: {1}".
915                      format(job, build["build"])))
916
917         state = "failed"
918         success = False
919         data = None
920         do_repeat = repeat
921         while do_repeat:
922             success = download_and_unzip_data_file(self._cfg, job, build, pid,
923                                                    logs)
924             if success:
925                 break
926             do_repeat -= 1
927         if not success:
928             logs.append(("ERROR", "It is not possible to download the input "
929                                   "data file from the job '{job}', build "
930                                   "'{build}', or it is damaged. Skipped.".
931                          format(job=job, build=build["build"])))
932         if success:
933             logs.append(("INFO", "  Processing data from the build '{0}' ...".
934                          format(build["build"])))
935             data = InputData._parse_tests(job, build, logs)
936             if data is None:
937                 logs.append(("ERROR", "Input data file from the job '{job}', "
938                                       "build '{build}' is damaged. Skipped.".
939                              format(job=job, build=build["build"])))
940             else:
941                 state = "processed"
942
943             try:
944                 remove(build["file-name"])
945             except OSError as err:
946                 logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
947                              format(build["file-name"], err)))
948         logs.append(("INFO", "  Done."))
949
950         result = {
951             "data": data,
952             "state": state,
953             "job": job,
954             "build": build,
955             "logs": logs
956         }
957         data_queue.put(result)
958
959     def download_and_parse_data(self, repeat=1):
960         """Download the input data files, parse input data from input files and
961         store in pandas' Series.
962
963         :param repeat: Repeat the download specified number of times if not
964             successful.
965         :type repeat: int
966         """
967
968         logging.info("Downloading and parsing input files ...")
969
970         work_queue = multiprocessing.JoinableQueue()
971         manager = multiprocessing.Manager()
972         data_queue = manager.Queue()
973         cpus = multiprocessing.cpu_count()
974
975         workers = list()
976         for cpu in range(cpus):
977             worker = Worker(work_queue,
978                             data_queue,
979                             self._download_and_parse_build)
980             worker.daemon = True
981             worker.start()
982             workers.append(worker)
983             os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
984                       format(cpu, worker.pid))
985
986         for job, builds in self._cfg.builds.items():
987             for build in builds:
988                 work_queue.put((job, build, repeat))
989
990         work_queue.join()
991
992         logging.info("Done.")
993
994         while not data_queue.empty():
995             result = data_queue.get()
996
997             job = result["job"]
998             build_nr = result["build"]["build"]
999
1000             if result["data"]:
1001                 data = result["data"]
1002                 build_data = pd.Series({
1003                     "metadata": pd.Series(data["metadata"].values(),
1004                                           index=data["metadata"].keys()),
1005                     "suites": pd.Series(data["suites"].values(),
1006                                         index=data["suites"].keys()),
1007                     "tests": pd.Series(data["tests"].values(),
1008                                        index=data["tests"].keys())})
1009
1010                 if self._input_data.get(job, None) is None:
1011                     self._input_data[job] = pd.Series()
1012                 self._input_data[job][str(build_nr)] = build_data
1013
1014                 self._cfg.set_input_file_name(job, build_nr,
1015                                               result["build"]["file-name"])
1016
1017             self._cfg.set_input_state(job, build_nr, result["state"])
1018
1019             for item in result["logs"]:
1020                 if item[0] == "INFO":
1021                     logging.info(item[1])
1022                 elif item[0] == "ERROR":
1023                     logging.error(item[1])
1024                 elif item[0] == "DEBUG":
1025                     logging.debug(item[1])
1026                 elif item[0] == "CRITICAL":
1027                     logging.critical(item[1])
1028                 elif item[0] == "WARNING":
1029                     logging.warning(item[1])
1030
1031         del data_queue
1032
1033         # Terminate all workers
1034         for worker in workers:
1035             worker.terminate()
1036             worker.join()
1037
1038         logging.info("Done.")
1039
1040     @staticmethod
1041     def _end_of_tag(tag_filter, start=0, closer="'"):
1042         """Return the index of character in the string which is the end of tag.
1043
1044         :param tag_filter: The string where the end of tag is being searched.
1045         :param start: The index where the searching is stated.
1046         :param closer: The character which is the tag closer.
1047         :type tag_filter: str
1048         :type start: int
1049         :type closer: str
1050         :returns: The index of the tag closer.
1051         :rtype: int
1052         """
1053
1054         try:
1055             idx_opener = tag_filter.index(closer, start)
1056             return tag_filter.index(closer, idx_opener + 1)
1057         except ValueError:
1058             return None
1059
1060     @staticmethod
1061     def _condition(tag_filter):
1062         """Create a conditional statement from the given tag filter.
1063
1064         :param tag_filter: Filter based on tags from the element specification.
1065         :type tag_filter: str
1066         :returns: Conditional statement which can be evaluated.
1067         :rtype: str
1068         """
1069
1070         index = 0
1071         while True:
1072             index = InputData._end_of_tag(tag_filter, index)
1073             if index is None:
1074                 return tag_filter
1075             index += 1
1076             tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
1077
1078     def filter_data(self, element, params=None, data_set="tests",
1079                     continue_on_error=False):
1080         """Filter required data from the given jobs and builds.
1081
1082         The output data structure is:
1083
1084         - job 1
1085           - build 1
1086             - test (or suite) 1 ID:
1087               - param 1
1088               - param 2
1089               ...
1090               - param n
1091             ...
1092             - test (or suite) n ID:
1093             ...
1094           ...
1095           - build n
1096         ...
1097         - job n
1098
1099         :param element: Element which will use the filtered data.
1100         :param params: Parameters which will be included in the output. If None,
1101         all parameters are included.
1102         :param data_set: The set of data to be filtered: tests, suites,
1103         metadata.
1104         :param continue_on_error: Continue if there is error while reading the
1105         data. The Item will be empty then
1106         :type element: pandas.Series
1107         :type params: list
1108         :type data_set: str
1109         :type continue_on_error: bool
1110         :returns: Filtered data.
1111         :rtype pandas.Series
1112         """
1113
1114         try:
1115             if element["filter"] in ("all", "template"):
1116                 cond = "True"
1117             else:
1118                 cond = InputData._condition(element["filter"])
1119             logging.debug("   Filter: {0}".format(cond))
1120         except KeyError:
1121             logging.error("  No filter defined.")
1122             return None
1123
1124         if params is None:
1125             params = element.get("parameters", None)
1126
1127         data = pd.Series()
1128         try:
1129             for job, builds in element["data"].items():
1130                 data[job] = pd.Series()
1131                 for build in builds:
1132                     data[job][str(build)] = pd.Series()
1133                     try:
1134                         data_iter = self.data[job][str(build)][data_set].\
1135                             iteritems()
1136                     except KeyError:
1137                         if continue_on_error:
1138                             continue
1139                         else:
1140                             return None
1141                     for test_ID, test_data in data_iter:
1142                         if eval(cond, {"tags": test_data.get("tags", "")}):
1143                             data[job][str(build)][test_ID] = pd.Series()
1144                             if params is None:
1145                                 for param, val in test_data.items():
1146                                     data[job][str(build)][test_ID][param] = val
1147                             else:
1148                                 for param in params:
1149                                     try:
1150                                         data[job][str(build)][test_ID][param] =\
1151                                             test_data[param]
1152                                     except KeyError:
1153                                         data[job][str(build)][test_ID][param] =\
1154                                             "No Data"
1155             return data
1156
1157         except (KeyError, IndexError, ValueError) as err:
1158             logging.error("   Missing mandatory parameter in the element "
1159                           "specification: {0}".format(err))
1160             return None
1161         except AttributeError:
1162             return None
1163         except SyntaxError:
1164             logging.error("   The filter '{0}' is not correct. Check if all "
1165                           "tags are enclosed by apostrophes.".format(cond))
1166             return None
1167
1168     @staticmethod
1169     def merge_data(data):
1170         """Merge data from more jobs and builds to a simple data structure.
1171
1172         The output data structure is:
1173
1174         - test (suite) 1 ID:
1175           - param 1
1176           - param 2
1177           ...
1178           - param n
1179         ...
1180         - test (suite) n ID:
1181         ...
1182
1183         :param data: Data to merge.
1184         :type data: pandas.Series
1185         :returns: Merged data.
1186         :rtype: pandas.Series
1187         """
1188
1189         logging.info("    Merging data ...")
1190
1191         merged_data = pd.Series()
1192         for _, builds in data.iteritems():
1193             for _, item in builds.iteritems():
1194                 for ID, item_data in item.iteritems():
1195                     merged_data[ID] = item_data
1196
1197         return merged_data