91c7747752a12900c6da3a2e212e1e8c611c801a
[csit.git] / resources / tools / presentation / input_data_parser.py
1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Data pre-processing
15
16 - extract data from output.xml files generated by Jenkins jobs and store in
17   pandas' Series,
18 - provide access to the data.
19 - filter the data using tags,
20 """
21
22 import multiprocessing
23 import os
24 import re
25 import pandas as pd
26 import logging
27
28 from robot.api import ExecutionResult, ResultVisitor
29 from robot import errors
30 from collections import OrderedDict
31 from string import replace
32 from os import remove
33 from jumpavg.AvgStdevMetadataFactory import AvgStdevMetadataFactory
34
35 from input_data_files import download_and_unzip_data_file
36 from utils import Worker
37
38
39 class ExecutionChecker(ResultVisitor):
40     """Class to traverse through the test suite structure.
41
42     The functionality implemented in this class generates a json structure:
43
44     Performance tests:
45
46     {
47         "metadata": {
48             "generated": "Timestamp",
49             "version": "SUT version",
50             "job": "Jenkins job name",
51             "build": "Information about the build"
52         },
53         "suites": {
54             "Suite long name 1": {
55                 "name": Suite name,
56                 "doc": "Suite 1 documentation",
57                 "parent": "Suite 1 parent",
58                 "level": "Level of the suite in the suite hierarchy"
59             }
60             "Suite long name N": {
61                 "name": Suite name,
62                 "doc": "Suite N documentation",
63                 "parent": "Suite 2 parent",
64                 "level": "Level of the suite in the suite hierarchy"
65             }
66         }
67         "tests": {
68             "ID": {
69                 "name": "Test name",
70                 "parent": "Name of the parent of the test",
71                 "doc": "Test documentation"
72                 "msg": "Test message"
73                 "tags": ["tag 1", "tag 2", "tag n"],
74                 "type": "PDR" | "NDR" | "TCP" | "MRR" | "BMRR",
75                 "throughput": {  # Only type: "PDR" | "NDR"
76                     "value": int,
77                     "unit": "pps" | "bps" | "percentage"
78                 },
79                 "latency": {  # Only type: "PDR" | "NDR"
80                     "direction1": {
81                         "100": {
82                             "min": int,
83                             "avg": int,
84                             "max": int
85                         },
86                         "50": {  # Only for NDR
87                             "min": int,
88                             "avg": int,
89                             "max": int
90                         },
91                         "10": {  # Only for NDR
92                             "min": int,
93                             "avg": int,
94                             "max": int
95                         }
96                     },
97                     "direction2": {
98                         "100": {
99                             "min": int,
100                             "avg": int,
101                             "max": int
102                         },
103                         "50": {  # Only for NDR
104                             "min": int,
105                             "avg": int,
106                             "max": int
107                         },
108                         "10": {  # Only for NDR
109                             "min": int,
110                             "avg": int,
111                             "max": int
112                         }
113                     }
114                 },
115                 "result": {  # Only type: "TCP"
116                     "value": int,
117                     "unit": "cps" | "rps"
118                 },
119                 "result": {  # Only type: "MRR" | "BMRR"
120                     "receive-rate": AvgStdevMetadata,
121                 },
122                 "lossTolerance": "lossTolerance",  # Only type: "PDR"
123                 "vat-history": "DUT1 and DUT2 VAT History"
124                 "show-run": "Show Run"
125             },
126             "ID" {
127                 # next test
128             }
129         }
130     }
131
132
133     Functional tests:
134
135     {
136         "metadata": {  # Optional
137             "version": "VPP version",
138             "job": "Jenkins job name",
139             "build": "Information about the build"
140         },
141         "suites": {
142             "Suite name 1": {
143                 "doc": "Suite 1 documentation",
144                 "parent": "Suite 1 parent",
145                 "level": "Level of the suite in the suite hierarchy"
146             }
147             "Suite name N": {
148                 "doc": "Suite N documentation",
149                 "parent": "Suite 2 parent",
150                 "level": "Level of the suite in the suite hierarchy"
151             }
152         }
153         "tests": {
154             "ID": {
155                 "name": "Test name",
156                 "parent": "Name of the parent of the test",
157                 "doc": "Test documentation"
158                 "msg": "Test message"
159                 "tags": ["tag 1", "tag 2", "tag n"],
160                 "vat-history": "DUT1 and DUT2 VAT History"
161                 "show-run": "Show Run"
162                 "status": "PASS" | "FAIL"
163             },
164             "ID" {
165                 # next test
166             }
167         }
168     }
169
170     .. note:: ID is the lowercase full path to the test.
171     """
172
173     REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
174
175     REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
176                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
177                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
178                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
179                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
180                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
181                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
182
183     REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
184                                r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
185                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
186
187     REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
188                                  r'[\D\d]*')
189
190     REGEX_VERSION_VPP = re.compile(r"(return STDOUT Version:\s*)(.*)")
191
192     REGEX_VERSION_DPDK = re.compile(r"(return STDOUT testpmd)([\d\D\n]*)"
193                                     r"(RTE Version: 'DPDK )(.*)(')")
194
195     REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
196
197     REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
198                            r'tx\s(\d*),\srx\s(\d*)')
199
200     REGEX_BMRR = re.compile(r'Maximum Receive Rate Results \[(.*)\]')
201
202     def __init__(self, metadata):
203         """Initialisation.
204
205         :param metadata: Key-value pairs to be included in "metadata" part of
206         JSON structure.
207         :type metadata: dict
208         """
209
210         # Type of message to parse out from the test messages
211         self._msg_type = None
212
213         # VPP version
214         self._version = None
215
216         # Timestamp
217         self._timestamp = None
218
219         # Number of VAT History messages found:
220         # 0 - no message
221         # 1 - VAT History of DUT1
222         # 2 - VAT History of DUT2
223         self._lookup_kw_nr = 0
224         self._vat_history_lookup_nr = 0
225
226         # Number of Show Running messages found
227         # 0 - no message
228         # 1 - Show run message found
229         self._show_run_lookup_nr = 0
230
231         # Test ID of currently processed test- the lowercase full path to the
232         # test
233         self._test_ID = None
234
235         # The main data structure
236         self._data = {
237             "metadata": OrderedDict(),
238             "suites": OrderedDict(),
239             "tests": OrderedDict()
240         }
241
242         # Save the provided metadata
243         for key, val in metadata.items():
244             self._data["metadata"][key] = val
245
246         # Dictionary defining the methods used to parse different types of
247         # messages
248         self.parse_msg = {
249             "timestamp": self._get_timestamp,
250             "vpp-version": self._get_vpp_version,
251             "dpdk-version": self._get_dpdk_version,
252             "teardown-vat-history": self._get_vat_history,
253             "test-show-runtime": self._get_show_run
254         }
255
256     @property
257     def data(self):
258         """Getter - Data parsed from the XML file.
259
260         :returns: Data parsed from the XML file.
261         :rtype: dict
262         """
263         return self._data
264
265     def _get_vpp_version(self, msg):
266         """Called when extraction of VPP version is required.
267
268         :param msg: Message to process.
269         :type msg: Message
270         :returns: Nothing.
271         """
272
273         if msg.message.count("return STDOUT Version:"):
274             self._version = str(re.search(self.REGEX_VERSION_VPP, msg.message).
275                                 group(2))
276             self._data["metadata"]["version"] = self._version
277             self._msg_type = None
278
279     def _get_dpdk_version(self, msg):
280         """Called when extraction of DPDK version is required.
281
282         :param msg: Message to process.
283         :type msg: Message
284         :returns: Nothing.
285         """
286
287         if msg.message.count("return STDOUT testpmd"):
288             try:
289                 self._version = str(re.search(
290                     self.REGEX_VERSION_DPDK, msg.message). group(4))
291                 self._data["metadata"]["version"] = self._version
292             except IndexError:
293                 pass
294             finally:
295                 self._msg_type = None
296
297     def _get_timestamp(self, msg):
298         """Called when extraction of timestamp is required.
299
300         :param msg: Message to process.
301         :type msg: Message
302         :returns: Nothing.
303         """
304
305         self._timestamp = msg.timestamp[:14]
306         self._data["metadata"]["generated"] = self._timestamp
307         self._msg_type = None
308
309     def _get_vat_history(self, msg):
310         """Called when extraction of VAT command history is required.
311
312         :param msg: Message to process.
313         :type msg: Message
314         :returns: Nothing.
315         """
316         if msg.message.count("VAT command history:"):
317             self._vat_history_lookup_nr += 1
318             if self._vat_history_lookup_nr == 1:
319                 self._data["tests"][self._test_ID]["vat-history"] = str()
320             else:
321                 self._msg_type = None
322             text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
323                           "VAT command history:", "", msg.message, count=1). \
324                 replace("\n\n", "\n").replace('\n', ' |br| ').\
325                 replace('\r', '').replace('"', "'")
326
327             self._data["tests"][self._test_ID]["vat-history"] += " |br| "
328             self._data["tests"][self._test_ID]["vat-history"] += \
329                 "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
330
331     def _get_show_run(self, msg):
332         """Called when extraction of VPP operational data (output of CLI command
333         Show Runtime) is required.
334
335         :param msg: Message to process.
336         :type msg: Message
337         :returns: Nothing.
338         """
339         if msg.message.count("return STDOUT Thread "):
340             self._show_run_lookup_nr += 1
341             if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
342                 self._data["tests"][self._test_ID]["show-run"] = str()
343             if self._lookup_kw_nr > 1:
344                 self._msg_type = None
345             if self._show_run_lookup_nr == 1:
346                 text = msg.message.replace("vat# ", "").\
347                     replace("return STDOUT ", "").replace("\n\n", "\n").\
348                     replace('\n', ' |br| ').\
349                     replace('\r', '').replace('"', "'")
350                 try:
351                     self._data["tests"][self._test_ID]["show-run"] += " |br| "
352                     self._data["tests"][self._test_ID]["show-run"] += \
353                         "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
354                 except KeyError:
355                     pass
356
357     def _get_latency(self, msg, test_type):
358         """Get the latency data from the test message.
359
360         :param msg: Message to be parsed.
361         :param test_type: Type of the test - NDR or PDR.
362         :type msg: str
363         :type test_type: str
364         :returns: Latencies parsed from the message.
365         :rtype: dict
366         """
367
368         if test_type == "NDR":
369             groups = re.search(self.REGEX_LAT_NDR, msg)
370             groups_range = range(1, 7)
371         elif test_type == "PDR":
372             groups = re.search(self.REGEX_LAT_PDR, msg)
373             groups_range = range(1, 3)
374         else:
375             return {}
376
377         latencies = list()
378         for idx in groups_range:
379             try:
380                 lat = [int(item) for item in str(groups.group(idx)).split('/')]
381             except (AttributeError, ValueError):
382                 lat = [-1, -1, -1]
383             latencies.append(lat)
384
385         keys = ("min", "avg", "max")
386         latency = {
387             "direction1": {
388             },
389             "direction2": {
390             }
391         }
392
393         latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
394         latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
395         if test_type == "NDR":
396             latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
397             latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
398             latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
399             latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
400
401         return latency
402
403     def visit_suite(self, suite):
404         """Implements traversing through the suite and its direct children.
405
406         :param suite: Suite to process.
407         :type suite: Suite
408         :returns: Nothing.
409         """
410         if self.start_suite(suite) is not False:
411             suite.suites.visit(self)
412             suite.tests.visit(self)
413             self.end_suite(suite)
414
415     def start_suite(self, suite):
416         """Called when suite starts.
417
418         :param suite: Suite to process.
419         :type suite: Suite
420         :returns: Nothing.
421         """
422
423         try:
424             parent_name = suite.parent.name
425         except AttributeError:
426             return
427
428         doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
429             replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
430         doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
431
432         self._data["suites"][suite.longname.lower().replace('"', "'").
433             replace(" ", "_")] = {
434                 "name": suite.name.lower(),
435                 "doc": doc_str,
436                 "parent": parent_name,
437                 "level": len(suite.longname.split("."))
438             }
439
440         suite.keywords.visit(self)
441
442     def end_suite(self, suite):
443         """Called when suite ends.
444
445         :param suite: Suite to process.
446         :type suite: Suite
447         :returns: Nothing.
448         """
449         pass
450
451     def visit_test(self, test):
452         """Implements traversing through the test.
453
454         :param test: Test to process.
455         :type test: Test
456         :returns: Nothing.
457         """
458         if self.start_test(test) is not False:
459             test.keywords.visit(self)
460             self.end_test(test)
461
462     def start_test(self, test):
463         """Called when test starts.
464
465         :param test: Test to process.
466         :type test: Test
467         :returns: Nothing.
468         """
469
470         tags = [str(tag) for tag in test.tags]
471         test_result = dict()
472         test_result["name"] = test.name.lower()
473         test_result["parent"] = test.parent.name.lower()
474         test_result["tags"] = tags
475         doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
476             replace('\r', '').replace('[', ' |br| [')
477         test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
478         test_result["msg"] = test.message.replace('\n', ' |br| '). \
479             replace('\r', '').replace('"', "'")
480         test_result["status"] = test.status
481         if test.status == "PASS" and ("NDRPDRDISC" in tags or
482                                       "TCP" in tags or
483                                       "MRR" in tags or
484                                       "BMRR" in tags):
485             if "NDRDISC" in tags:
486                 test_type = "NDR"
487             elif "PDRDISC" in tags:
488                 test_type = "PDR"
489             elif "TCP" in tags:
490                 test_type = "TCP"
491             elif "MRR" in tags:
492                 test_type = "MRR"
493             elif "FRMOBL" in tags or "BMRR" in tags:
494                 test_type = "BMRR"
495             else:
496                 return
497
498             test_result["type"] = test_type
499
500             if test_type in ("NDR", "PDR"):
501                 try:
502                     rate_value = str(re.search(
503                         self.REGEX_RATE, test.message).group(1))
504                 except AttributeError:
505                     rate_value = "-1"
506                 try:
507                     rate_unit = str(re.search(
508                         self.REGEX_RATE, test.message).group(2))
509                 except AttributeError:
510                     rate_unit = "-1"
511
512                 test_result["throughput"] = dict()
513                 test_result["throughput"]["value"] = \
514                     int(rate_value.split('.')[0])
515                 test_result["throughput"]["unit"] = rate_unit
516                 test_result["latency"] = \
517                     self._get_latency(test.message, test_type)
518                 if test_type == "PDR":
519                     test_result["lossTolerance"] = str(re.search(
520                         self.REGEX_TOLERANCE, test.message).group(1))
521
522             elif test_type in ("TCP", ):
523                 groups = re.search(self.REGEX_TCP, test.message)
524                 test_result["result"] = dict()
525                 test_result["result"]["value"] = int(groups.group(2))
526                 test_result["result"]["unit"] = groups.group(1)
527
528             elif test_type in ("MRR", "BMRR"):
529                 test_result["result"] = dict()
530                 groups = re.search(self.REGEX_BMRR, test.message)
531                 if groups is not None:
532                     items_str = groups.group(1)
533                     items_float = [float(item.strip()) for item
534                                    in items_str.split(",")]
535                     test_result["result"]["receive-rate"] = \
536                         AvgStdevMetadataFactory.from_data(items_float)
537                 else:
538                     groups = re.search(self.REGEX_MRR, test.message)
539                     test_result["result"]["receive-rate"] = \
540                         AvgStdevMetadataFactory.from_data([
541                             float(groups.group(3)) / float(groups.group(1)), ])
542
543         self._test_ID = test.longname.lower()
544         self._data["tests"][self._test_ID] = test_result
545
546     def end_test(self, test):
547         """Called when test ends.
548
549         :param test: Test to process.
550         :type test: Test
551         :returns: Nothing.
552         """
553         pass
554
555     def visit_keyword(self, keyword):
556         """Implements traversing through the keyword and its child keywords.
557
558         :param keyword: Keyword to process.
559         :type keyword: Keyword
560         :returns: Nothing.
561         """
562         if self.start_keyword(keyword) is not False:
563             self.end_keyword(keyword)
564
565     def start_keyword(self, keyword):
566         """Called when keyword starts. Default implementation does nothing.
567
568         :param keyword: Keyword to process.
569         :type keyword: Keyword
570         :returns: Nothing.
571         """
572         try:
573             if keyword.type == "setup":
574                 self.visit_setup_kw(keyword)
575             elif keyword.type == "teardown":
576                 self._lookup_kw_nr = 0
577                 self.visit_teardown_kw(keyword)
578             else:
579                 self._lookup_kw_nr = 0
580                 self.visit_test_kw(keyword)
581         except AttributeError:
582             pass
583
584     def end_keyword(self, keyword):
585         """Called when keyword ends. Default implementation does nothing.
586
587         :param keyword: Keyword to process.
588         :type keyword: Keyword
589         :returns: Nothing.
590         """
591         pass
592
593     def visit_test_kw(self, test_kw):
594         """Implements traversing through the test keyword and its child
595         keywords.
596
597         :param test_kw: Keyword to process.
598         :type test_kw: Keyword
599         :returns: Nothing.
600         """
601         for keyword in test_kw.keywords:
602             if self.start_test_kw(keyword) is not False:
603                 self.visit_test_kw(keyword)
604                 self.end_test_kw(keyword)
605
606     def start_test_kw(self, test_kw):
607         """Called when test keyword starts. Default implementation does
608         nothing.
609
610         :param test_kw: Keyword to process.
611         :type test_kw: Keyword
612         :returns: Nothing.
613         """
614         if test_kw.name.count("Show Runtime Counters On All Duts"):
615             self._lookup_kw_nr += 1
616             self._show_run_lookup_nr = 0
617             self._msg_type = "test-show-runtime"
618         elif test_kw.name.count("Start The L2fwd Test") and not self._version:
619             self._msg_type = "dpdk-version"
620         else:
621             return
622         test_kw.messages.visit(self)
623
624     def end_test_kw(self, test_kw):
625         """Called when keyword ends. Default implementation does nothing.
626
627         :param test_kw: Keyword to process.
628         :type test_kw: Keyword
629         :returns: Nothing.
630         """
631         pass
632
633     def visit_setup_kw(self, setup_kw):
634         """Implements traversing through the teardown keyword and its child
635         keywords.
636
637         :param setup_kw: Keyword to process.
638         :type setup_kw: Keyword
639         :returns: Nothing.
640         """
641         for keyword in setup_kw.keywords:
642             if self.start_setup_kw(keyword) is not False:
643                 self.visit_setup_kw(keyword)
644                 self.end_setup_kw(keyword)
645
646     def start_setup_kw(self, setup_kw):
647         """Called when teardown keyword starts. Default implementation does
648         nothing.
649
650         :param setup_kw: Keyword to process.
651         :type setup_kw: Keyword
652         :returns: Nothing.
653         """
654         if setup_kw.name.count("Show Vpp Version On All Duts") \
655                 and not self._version:
656             self._msg_type = "vpp-version"
657
658         elif setup_kw.name.count("Setup performance global Variables") \
659                 and not self._timestamp:
660             self._msg_type = "timestamp"
661         else:
662             return
663         setup_kw.messages.visit(self)
664
665     def end_setup_kw(self, setup_kw):
666         """Called when keyword ends. Default implementation does nothing.
667
668         :param setup_kw: Keyword to process.
669         :type setup_kw: Keyword
670         :returns: Nothing.
671         """
672         pass
673
674     def visit_teardown_kw(self, teardown_kw):
675         """Implements traversing through the teardown keyword and its child
676         keywords.
677
678         :param teardown_kw: Keyword to process.
679         :type teardown_kw: Keyword
680         :returns: Nothing.
681         """
682         for keyword in teardown_kw.keywords:
683             if self.start_teardown_kw(keyword) is not False:
684                 self.visit_teardown_kw(keyword)
685                 self.end_teardown_kw(keyword)
686
687     def start_teardown_kw(self, teardown_kw):
688         """Called when teardown keyword starts. Default implementation does
689         nothing.
690
691         :param teardown_kw: Keyword to process.
692         :type teardown_kw: Keyword
693         :returns: Nothing.
694         """
695
696         if teardown_kw.name.count("Show Vat History On All Duts"):
697             self._vat_history_lookup_nr = 0
698             self._msg_type = "teardown-vat-history"
699             teardown_kw.messages.visit(self)
700
701     def end_teardown_kw(self, teardown_kw):
702         """Called when keyword ends. Default implementation does nothing.
703
704         :param teardown_kw: Keyword to process.
705         :type teardown_kw: Keyword
706         :returns: Nothing.
707         """
708         pass
709
710     def visit_message(self, msg):
711         """Implements visiting the message.
712
713         :param msg: Message to process.
714         :type msg: Message
715         :returns: Nothing.
716         """
717         if self.start_message(msg) is not False:
718             self.end_message(msg)
719
720     def start_message(self, msg):
721         """Called when message starts. Get required information from messages:
722         - VPP version.
723
724         :param msg: Message to process.
725         :type msg: Message
726         :returns: Nothing.
727         """
728
729         if self._msg_type:
730             self.parse_msg[self._msg_type](msg)
731
732     def end_message(self, msg):
733         """Called when message ends. Default implementation does nothing.
734
735         :param msg: Message to process.
736         :type msg: Message
737         :returns: Nothing.
738         """
739         pass
740
741
742 class InputData(object):
743     """Input data
744
745     The data is extracted from output.xml files generated by Jenkins jobs and
746     stored in pandas' DataFrames.
747
748     The data structure:
749     - job name
750       - build number
751         - metadata
752           (as described in ExecutionChecker documentation)
753         - suites
754           (as described in ExecutionChecker documentation)
755         - tests
756           (as described in ExecutionChecker documentation)
757     """
758
759     def __init__(self, spec):
760         """Initialization.
761
762         :param spec: Specification.
763         :type spec: Specification
764         """
765
766         # Specification:
767         self._cfg = spec
768
769         # Data store:
770         self._input_data = pd.Series()
771
772     @property
773     def data(self):
774         """Getter - Input data.
775
776         :returns: Input data
777         :rtype: pandas.Series
778         """
779         return self._input_data
780
781     def metadata(self, job, build):
782         """Getter - metadata
783
784         :param job: Job which metadata we want.
785         :param build: Build which metadata we want.
786         :type job: str
787         :type build: str
788         :returns: Metadata
789         :rtype: pandas.Series
790         """
791
792         return self.data[job][build]["metadata"]
793
794     def suites(self, job, build):
795         """Getter - suites
796
797         :param job: Job which suites we want.
798         :param build: Build which suites we want.
799         :type job: str
800         :type build: str
801         :returns: Suites.
802         :rtype: pandas.Series
803         """
804
805         return self.data[job][str(build)]["suites"]
806
807     def tests(self, job, build):
808         """Getter - tests
809
810         :param job: Job which tests we want.
811         :param build: Build which tests we want.
812         :type job: str
813         :type build: str
814         :returns: Tests.
815         :rtype: pandas.Series
816         """
817
818         return self.data[job][build]["tests"]
819
820     @staticmethod
821     def _parse_tests(job, build, log):
822         """Process data from robot output.xml file and return JSON structured
823         data.
824
825         :param job: The name of job which build output data will be processed.
826         :param build: The build which output data will be processed.
827         :param log: List of log messages.
828         :type job: str
829         :type build: dict
830         :type log: list of tuples (severity, msg)
831         :returns: JSON data structure.
832         :rtype: dict
833         """
834
835         metadata = {
836             "job": job,
837             "build": build
838         }
839
840         with open(build["file-name"], 'r') as data_file:
841             try:
842                 result = ExecutionResult(data_file)
843             except errors.DataError as err:
844                 log.append(("ERROR", "Error occurred while parsing output.xml: "
845                                      "{0}".format(err)))
846                 return None
847         checker = ExecutionChecker(metadata)
848         result.visit(checker)
849
850         return checker.data
851
852     def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
853         """Download and parse the input data file.
854
855         :param pid: PID of the process executing this method.
856         :param data_queue: Shared memory between processes. Queue which keeps
857             the result data. This data is then read by the main process and used
858             in further processing.
859         :param job: Name of the Jenkins job which generated the processed input
860             file.
861         :param build: Information about the Jenkins build which generated the
862             processed input file.
863         :param repeat: Repeat the download specified number of times if not
864             successful.
865         :type pid: int
866         :type data_queue: multiprocessing.Manager().Queue()
867         :type job: str
868         :type build: dict
869         :type repeat: int
870         """
871
872         logs = list()
873
874         logging.info("  Processing the job/build: {0}: {1}".
875                      format(job, build["build"]))
876
877         logs.append(("INFO", "  Processing the job/build: {0}: {1}".
878                      format(job, build["build"])))
879
880         state = "failed"
881         success = False
882         data = None
883         do_repeat = repeat
884         while do_repeat:
885             success = download_and_unzip_data_file(self._cfg, job, build, pid,
886                                                    logs)
887             if success:
888                 break
889             do_repeat -= 1
890         if not success:
891             logs.append(("ERROR", "It is not possible to download the input "
892                                   "data file from the job '{job}', build "
893                                   "'{build}', or it is damaged. Skipped.".
894                          format(job=job, build=build["build"])))
895         if success:
896             logs.append(("INFO", "  Processing data from the build '{0}' ...".
897                          format(build["build"])))
898             data = InputData._parse_tests(job, build, logs)
899             if data is None:
900                 logs.append(("ERROR", "Input data file from the job '{job}', "
901                                       "build '{build}' is damaged. Skipped.".
902                              format(job=job, build=build["build"])))
903             else:
904                 state = "processed"
905
906             try:
907                 remove(build["file-name"])
908             except OSError as err:
909                 logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
910                              format(build["file-name"], err)))
911         logs.append(("INFO", "  Done."))
912
913         result = {
914             "data": data,
915             "state": state,
916             "job": job,
917             "build": build,
918             "logs": logs
919         }
920         data_queue.put(result)
921
922     def download_and_parse_data(self, repeat=1):
923         """Download the input data files, parse input data from input files and
924         store in pandas' Series.
925
926         :param repeat: Repeat the download specified number of times if not
927             successful.
928         :type repeat: int
929         """
930
931         logging.info("Downloading and parsing input files ...")
932
933         work_queue = multiprocessing.JoinableQueue()
934         manager = multiprocessing.Manager()
935         data_queue = manager.Queue()
936         cpus = multiprocessing.cpu_count()
937
938         workers = list()
939         for cpu in range(cpus):
940             worker = Worker(work_queue,
941                             data_queue,
942                             self._download_and_parse_build)
943             worker.daemon = True
944             worker.start()
945             workers.append(worker)
946             os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
947                       format(cpu, worker.pid))
948
949         for job, builds in self._cfg.builds.items():
950             for build in builds:
951                 work_queue.put((job, build, repeat))
952
953         work_queue.join()
954
955         logging.info("Done.")
956
957         while not data_queue.empty():
958             result = data_queue.get()
959
960             job = result["job"]
961             build_nr = result["build"]["build"]
962
963             if result["data"]:
964                 data = result["data"]
965                 build_data = pd.Series({
966                     "metadata": pd.Series(data["metadata"].values(),
967                                           index=data["metadata"].keys()),
968                     "suites": pd.Series(data["suites"].values(),
969                                         index=data["suites"].keys()),
970                     "tests": pd.Series(data["tests"].values(),
971                                        index=data["tests"].keys())})
972
973                 if self._input_data.get(job, None) is None:
974                     self._input_data[job] = pd.Series()
975                 self._input_data[job][str(build_nr)] = build_data
976
977                 self._cfg.set_input_file_name(job, build_nr,
978                                               result["build"]["file-name"])
979
980             self._cfg.set_input_state(job, build_nr, result["state"])
981
982             for item in result["logs"]:
983                 if item[0] == "INFO":
984                     logging.info(item[1])
985                 elif item[0] == "ERROR":
986                     logging.error(item[1])
987                 elif item[0] == "DEBUG":
988                     logging.debug(item[1])
989                 elif item[0] == "CRITICAL":
990                     logging.critical(item[1])
991                 elif item[0] == "WARNING":
992                     logging.warning(item[1])
993
994         del data_queue
995
996         # Terminate all workers
997         for worker in workers:
998             worker.terminate()
999             worker.join()
1000
1001         logging.info("Done.")
1002
1003     @staticmethod
1004     def _end_of_tag(tag_filter, start=0, closer="'"):
1005         """Return the index of character in the string which is the end of tag.
1006
1007         :param tag_filter: The string where the end of tag is being searched.
1008         :param start: The index where the searching is stated.
1009         :param closer: The character which is the tag closer.
1010         :type tag_filter: str
1011         :type start: int
1012         :type closer: str
1013         :returns: The index of the tag closer.
1014         :rtype: int
1015         """
1016
1017         try:
1018             idx_opener = tag_filter.index(closer, start)
1019             return tag_filter.index(closer, idx_opener + 1)
1020         except ValueError:
1021             return None
1022
1023     @staticmethod
1024     def _condition(tag_filter):
1025         """Create a conditional statement from the given tag filter.
1026
1027         :param tag_filter: Filter based on tags from the element specification.
1028         :type tag_filter: str
1029         :returns: Conditional statement which can be evaluated.
1030         :rtype: str
1031         """
1032
1033         index = 0
1034         while True:
1035             index = InputData._end_of_tag(tag_filter, index)
1036             if index is None:
1037                 return tag_filter
1038             index += 1
1039             tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
1040
1041     def filter_data(self, element, params=None, data_set="tests",
1042                     continue_on_error=False):
1043         """Filter required data from the given jobs and builds.
1044
1045         The output data structure is:
1046
1047         - job 1
1048           - build 1
1049             - test (or suite) 1 ID:
1050               - param 1
1051               - param 2
1052               ...
1053               - param n
1054             ...
1055             - test (or suite) n ID:
1056             ...
1057           ...
1058           - build n
1059         ...
1060         - job n
1061
1062         :param element: Element which will use the filtered data.
1063         :param params: Parameters which will be included in the output. If None,
1064         all parameters are included.
1065         :param data_set: The set of data to be filtered: tests, suites,
1066         metadata.
1067         :param continue_on_error: Continue if there is error while reading the
1068         data. The Item will be empty then
1069         :type element: pandas.Series
1070         :type params: list
1071         :type data_set: str
1072         :type continue_on_error: bool
1073         :returns: Filtered data.
1074         :rtype pandas.Series
1075         """
1076
1077         try:
1078             if element["filter"] in ("all", "template"):
1079                 cond = "True"
1080             else:
1081                 cond = InputData._condition(element["filter"])
1082             logging.debug("   Filter: {0}".format(cond))
1083         except KeyError:
1084             logging.error("  No filter defined.")
1085             return None
1086
1087         if params is None:
1088             params = element.get("parameters", None)
1089
1090         data = pd.Series()
1091         try:
1092             for job, builds in element["data"].items():
1093                 data[job] = pd.Series()
1094                 for build in builds:
1095                     data[job][str(build)] = pd.Series()
1096                     try:
1097                         data_iter = self.data[job][str(build)][data_set].\
1098                             iteritems()
1099                     except KeyError:
1100                         if continue_on_error:
1101                             continue
1102                         else:
1103                             return None
1104                     for test_ID, test_data in data_iter:
1105                         if eval(cond, {"tags": test_data.get("tags", "")}):
1106                             data[job][str(build)][test_ID] = pd.Series()
1107                             if params is None:
1108                                 for param, val in test_data.items():
1109                                     data[job][str(build)][test_ID][param] = val
1110                             else:
1111                                 for param in params:
1112                                     try:
1113                                         data[job][str(build)][test_ID][param] =\
1114                                             test_data[param]
1115                                     except KeyError:
1116                                         data[job][str(build)][test_ID][param] =\
1117                                             "No Data"
1118             return data
1119
1120         except (KeyError, IndexError, ValueError) as err:
1121             logging.error("   Missing mandatory parameter in the element "
1122                           "specification: {0}".format(err))
1123             return None
1124         except AttributeError:
1125             return None
1126         except SyntaxError:
1127             logging.error("   The filter '{0}' is not correct. Check if all "
1128                           "tags are enclosed by apostrophes.".format(cond))
1129             return None
1130
1131     @staticmethod
1132     def merge_data(data):
1133         """Merge data from more jobs and builds to a simple data structure.
1134
1135         The output data structure is:
1136
1137         - test (suite) 1 ID:
1138           - param 1
1139           - param 2
1140           ...
1141           - param n
1142         ...
1143         - test (suite) n ID:
1144         ...
1145
1146         :param data: Data to merge.
1147         :type data: pandas.Series
1148         :returns: Merged data.
1149         :rtype: pandas.Series
1150         """
1151
1152         logging.info("    Merging data ...")
1153
1154         merged_data = pd.Series()
1155         for _, builds in data.iteritems():
1156             for _, item in builds.iteritems():
1157                 for ID, item_data in item.iteritems():
1158                     merged_data[ID] = item_data
1159
1160         return merged_data