PAL: Treat the new and the old TC names
[csit.git] / resources / tools / presentation / input_data_parser.py
1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Data pre-processing
15
16 - extract data from output.xml files generated by Jenkins jobs and store in
17   pandas' Series,
18 - provide access to the data.
19 - filter the data using tags,
20 """
21
22 import multiprocessing
23 import os
24 import re
25 import pandas as pd
26 import logging
27
28 from robot.api import ExecutionResult, ResultVisitor
29 from robot import errors
30 from collections import OrderedDict
31 from string import replace
32 from os import remove
33 from jumpavg.AvgStdevMetadataFactory import AvgStdevMetadataFactory
34
35 from input_data_files import download_and_unzip_data_file
36 from utils import Worker
37
38
39 class ExecutionChecker(ResultVisitor):
40     """Class to traverse through the test suite structure.
41
42     The functionality implemented in this class generates a json structure:
43
44     Performance tests:
45
46     {
47         "metadata": {
48             "generated": "Timestamp",
49             "version": "SUT version",
50             "job": "Jenkins job name",
51             "build": "Information about the build"
52         },
53         "suites": {
54             "Suite long name 1": {
55                 "name": Suite name,
56                 "doc": "Suite 1 documentation",
57                 "parent": "Suite 1 parent",
58                 "level": "Level of the suite in the suite hierarchy"
59             }
60             "Suite long name N": {
61                 "name": Suite name,
62                 "doc": "Suite N documentation",
63                 "parent": "Suite 2 parent",
64                 "level": "Level of the suite in the suite hierarchy"
65             }
66         }
67         "tests": {
68             "ID": {
69                 "name": "Test name",
70                 "parent": "Name of the parent of the test",
71                 "doc": "Test documentation"
72                 "msg": "Test message"
73                 "tags": ["tag 1", "tag 2", "tag n"],
74                 "type": "PDR" | "NDR" | "TCP" | "MRR" | "BMRR",
75                 "throughput": {  # Only type: "PDR" | "NDR"
76                     "value": int,
77                     "unit": "pps" | "bps" | "percentage"
78                 },
79                 "latency": {  # Only type: "PDR" | "NDR"
80                     "direction1": {
81                         "100": {
82                             "min": int,
83                             "avg": int,
84                             "max": int
85                         },
86                         "50": {  # Only for NDR
87                             "min": int,
88                             "avg": int,
89                             "max": int
90                         },
91                         "10": {  # Only for NDR
92                             "min": int,
93                             "avg": int,
94                             "max": int
95                         }
96                     },
97                     "direction2": {
98                         "100": {
99                             "min": int,
100                             "avg": int,
101                             "max": int
102                         },
103                         "50": {  # Only for NDR
104                             "min": int,
105                             "avg": int,
106                             "max": int
107                         },
108                         "10": {  # Only for NDR
109                             "min": int,
110                             "avg": int,
111                             "max": int
112                         }
113                     }
114                 },
115                 "result": {  # Only type: "TCP"
116                     "value": int,
117                     "unit": "cps" | "rps"
118                 },
119                 "result": {  # Only type: "MRR" | "BMRR"
120                     "receive-rate": AvgStdevMetadata,
121                 },
122                 "lossTolerance": "lossTolerance",  # Only type: "PDR"
123                 "vat-history": "DUT1 and DUT2 VAT History"
124                 "show-run": "Show Run"
125             },
126             "ID" {
127                 # next test
128             }
129         }
130     }
131
132
133     Functional tests:
134
135     {
136         "metadata": {  # Optional
137             "version": "VPP version",
138             "job": "Jenkins job name",
139             "build": "Information about the build"
140         },
141         "suites": {
142             "Suite name 1": {
143                 "doc": "Suite 1 documentation",
144                 "parent": "Suite 1 parent",
145                 "level": "Level of the suite in the suite hierarchy"
146             }
147             "Suite name N": {
148                 "doc": "Suite N documentation",
149                 "parent": "Suite 2 parent",
150                 "level": "Level of the suite in the suite hierarchy"
151             }
152         }
153         "tests": {
154             "ID": {
155                 "name": "Test name",
156                 "parent": "Name of the parent of the test",
157                 "doc": "Test documentation"
158                 "msg": "Test message"
159                 "tags": ["tag 1", "tag 2", "tag n"],
160                 "vat-history": "DUT1 and DUT2 VAT History"
161                 "show-run": "Show Run"
162                 "status": "PASS" | "FAIL"
163             },
164             "ID" {
165                 # next test
166             }
167         }
168     }
169
170     .. note:: ID is the lowercase full path to the test.
171     """
172
173     REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
174
175     REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
176                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
177                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
178                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
179                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
180                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
181                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
182
183     REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
184                                r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
185                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
186
187     REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
188                                  r'[\D\d]*')
189
190     REGEX_VERSION_VPP = re.compile(r"(return STDOUT Version:\s*)(.*)")
191
192     REGEX_VERSION_DPDK = re.compile(r"(return STDOUT testpmd)([\d\D\n]*)"
193                                     r"(RTE Version: 'DPDK )(.*)(')")
194
195     REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
196
197     REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
198                            r'tx\s(\d*),\srx\s(\d*)')
199
200     REGEX_BMRR = re.compile(r'Maximum Receive Rate Results \[(.*)\]')
201
202     REGEX_TC_TAG = re.compile(r'\d+[tT]\d+[cC]')
203
204     REGEX_TC_NAME_OLD = re.compile(r'-\d+[tT]\d+[cC]-')
205
206     REGEX_TC_NAME_NEW = re.compile(r'-\d+[cC]-')
207
208     def __init__(self, metadata):
209         """Initialisation.
210
211         :param metadata: Key-value pairs to be included in "metadata" part of
212         JSON structure.
213         :type metadata: dict
214         """
215
216         # Type of message to parse out from the test messages
217         self._msg_type = None
218
219         # VPP version
220         self._version = None
221
222         # Timestamp
223         self._timestamp = None
224
225         # Number of VAT History messages found:
226         # 0 - no message
227         # 1 - VAT History of DUT1
228         # 2 - VAT History of DUT2
229         self._lookup_kw_nr = 0
230         self._vat_history_lookup_nr = 0
231
232         # Number of Show Running messages found
233         # 0 - no message
234         # 1 - Show run message found
235         self._show_run_lookup_nr = 0
236
237         # Test ID of currently processed test- the lowercase full path to the
238         # test
239         self._test_ID = None
240
241         # The main data structure
242         self._data = {
243             "metadata": OrderedDict(),
244             "suites": OrderedDict(),
245             "tests": OrderedDict()
246         }
247
248         # Save the provided metadata
249         for key, val in metadata.items():
250             self._data["metadata"][key] = val
251
252         # Dictionary defining the methods used to parse different types of
253         # messages
254         self.parse_msg = {
255             "timestamp": self._get_timestamp,
256             "vpp-version": self._get_vpp_version,
257             "dpdk-version": self._get_dpdk_version,
258             "teardown-vat-history": self._get_vat_history,
259             "test-show-runtime": self._get_show_run
260         }
261
262     @property
263     def data(self):
264         """Getter - Data parsed from the XML file.
265
266         :returns: Data parsed from the XML file.
267         :rtype: dict
268         """
269         return self._data
270
271     def _get_vpp_version(self, msg):
272         """Called when extraction of VPP version is required.
273
274         :param msg: Message to process.
275         :type msg: Message
276         :returns: Nothing.
277         """
278
279         if msg.message.count("return STDOUT Version:"):
280             self._version = str(re.search(self.REGEX_VERSION_VPP, msg.message).
281                                 group(2))
282             self._data["metadata"]["version"] = self._version
283             self._msg_type = None
284
285     def _get_dpdk_version(self, msg):
286         """Called when extraction of DPDK version is required.
287
288         :param msg: Message to process.
289         :type msg: Message
290         :returns: Nothing.
291         """
292
293         if msg.message.count("return STDOUT testpmd"):
294             try:
295                 self._version = str(re.search(
296                     self.REGEX_VERSION_DPDK, msg.message). group(4))
297                 self._data["metadata"]["version"] = self._version
298             except IndexError:
299                 pass
300             finally:
301                 self._msg_type = None
302
303     def _get_timestamp(self, msg):
304         """Called when extraction of timestamp is required.
305
306         :param msg: Message to process.
307         :type msg: Message
308         :returns: Nothing.
309         """
310
311         self._timestamp = msg.timestamp[:14]
312         self._data["metadata"]["generated"] = self._timestamp
313         self._msg_type = None
314
315     def _get_vat_history(self, msg):
316         """Called when extraction of VAT command history is required.
317
318         :param msg: Message to process.
319         :type msg: Message
320         :returns: Nothing.
321         """
322         if msg.message.count("VAT command history:"):
323             self._vat_history_lookup_nr += 1
324             if self._vat_history_lookup_nr == 1:
325                 self._data["tests"][self._test_ID]["vat-history"] = str()
326             else:
327                 self._msg_type = None
328             text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
329                           "VAT command history:", "", msg.message, count=1). \
330                 replace("\n\n", "\n").replace('\n', ' |br| ').\
331                 replace('\r', '').replace('"', "'")
332
333             self._data["tests"][self._test_ID]["vat-history"] += " |br| "
334             self._data["tests"][self._test_ID]["vat-history"] += \
335                 "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
336
337     def _get_show_run(self, msg):
338         """Called when extraction of VPP operational data (output of CLI command
339         Show Runtime) is required.
340
341         :param msg: Message to process.
342         :type msg: Message
343         :returns: Nothing.
344         """
345         if msg.message.count("return STDOUT Thread "):
346             self._show_run_lookup_nr += 1
347             if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
348                 self._data["tests"][self._test_ID]["show-run"] = str()
349             if self._lookup_kw_nr > 1:
350                 self._msg_type = None
351             if self._show_run_lookup_nr == 1:
352                 text = msg.message.replace("vat# ", "").\
353                     replace("return STDOUT ", "").replace("\n\n", "\n").\
354                     replace('\n', ' |br| ').\
355                     replace('\r', '').replace('"', "'")
356                 try:
357                     self._data["tests"][self._test_ID]["show-run"] += " |br| "
358                     self._data["tests"][self._test_ID]["show-run"] += \
359                         "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
360                 except KeyError:
361                     pass
362
363     def _get_latency(self, msg, test_type):
364         """Get the latency data from the test message.
365
366         :param msg: Message to be parsed.
367         :param test_type: Type of the test - NDR or PDR.
368         :type msg: str
369         :type test_type: str
370         :returns: Latencies parsed from the message.
371         :rtype: dict
372         """
373
374         if test_type == "NDR":
375             groups = re.search(self.REGEX_LAT_NDR, msg)
376             groups_range = range(1, 7)
377         elif test_type == "PDR":
378             groups = re.search(self.REGEX_LAT_PDR, msg)
379             groups_range = range(1, 3)
380         else:
381             return {}
382
383         latencies = list()
384         for idx in groups_range:
385             try:
386                 lat = [int(item) for item in str(groups.group(idx)).split('/')]
387             except (AttributeError, ValueError):
388                 lat = [-1, -1, -1]
389             latencies.append(lat)
390
391         keys = ("min", "avg", "max")
392         latency = {
393             "direction1": {
394             },
395             "direction2": {
396             }
397         }
398
399         latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
400         latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
401         if test_type == "NDR":
402             latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
403             latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
404             latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
405             latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
406
407         return latency
408
409     def visit_suite(self, suite):
410         """Implements traversing through the suite and its direct children.
411
412         :param suite: Suite to process.
413         :type suite: Suite
414         :returns: Nothing.
415         """
416         if self.start_suite(suite) is not False:
417             suite.suites.visit(self)
418             suite.tests.visit(self)
419             self.end_suite(suite)
420
421     def start_suite(self, suite):
422         """Called when suite starts.
423
424         :param suite: Suite to process.
425         :type suite: Suite
426         :returns: Nothing.
427         """
428
429         try:
430             parent_name = suite.parent.name
431         except AttributeError:
432             return
433
434         doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
435             replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
436         doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
437
438         self._data["suites"][suite.longname.lower().replace('"', "'").
439             replace(" ", "_")] = {
440                 "name": suite.name.lower(),
441                 "doc": doc_str,
442                 "parent": parent_name,
443                 "level": len(suite.longname.split("."))
444             }
445
446         suite.keywords.visit(self)
447
448     def end_suite(self, suite):
449         """Called when suite ends.
450
451         :param suite: Suite to process.
452         :type suite: Suite
453         :returns: Nothing.
454         """
455         pass
456
457     def visit_test(self, test):
458         """Implements traversing through the test.
459
460         :param test: Test to process.
461         :type test: Test
462         :returns: Nothing.
463         """
464         if self.start_test(test) is not False:
465             test.keywords.visit(self)
466             self.end_test(test)
467
468     def start_test(self, test):
469         """Called when test starts.
470
471         :param test: Test to process.
472         :type test: Test
473         :returns: Nothing.
474         """
475
476         tags = [str(tag) for tag in test.tags]
477         test_result = dict()
478         test_result["name"] = test.name.lower()
479         test_result["parent"] = test.parent.name.lower()
480         test_result["tags"] = tags
481         doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
482             replace('\r', '').replace('[', ' |br| [')
483         test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
484         test_result["msg"] = test.message.replace('\n', ' |br| '). \
485             replace('\r', '').replace('"', "'")
486         test_result["status"] = test.status
487         if test.status == "PASS" and ("NDRPDRDISC" in tags or
488                                       "TCP" in tags or
489                                       "MRR" in tags or
490                                       "BMRR" in tags):
491             if "NDRDISC" in tags:
492                 test_type = "NDR"
493             elif "PDRDISC" in tags:
494                 test_type = "PDR"
495             elif "TCP" in tags:
496                 test_type = "TCP"
497             elif "MRR" in tags:
498                 test_type = "MRR"
499             elif "FRMOBL" in tags or "BMRR" in tags:
500                 test_type = "BMRR"
501             else:
502                 return
503
504             test_result["type"] = test_type
505
506             if test_type in ("NDR", "PDR"):
507                 try:
508                     rate_value = str(re.search(
509                         self.REGEX_RATE, test.message).group(1))
510                 except AttributeError:
511                     rate_value = "-1"
512                 try:
513                     rate_unit = str(re.search(
514                         self.REGEX_RATE, test.message).group(2))
515                 except AttributeError:
516                     rate_unit = "-1"
517
518                 test_result["throughput"] = dict()
519                 test_result["throughput"]["value"] = \
520                     int(rate_value.split('.')[0])
521                 test_result["throughput"]["unit"] = rate_unit
522                 test_result["latency"] = \
523                     self._get_latency(test.message, test_type)
524                 if test_type == "PDR":
525                     test_result["lossTolerance"] = str(re.search(
526                         self.REGEX_TOLERANCE, test.message).group(1))
527
528             elif test_type in ("TCP", ):
529                 groups = re.search(self.REGEX_TCP, test.message)
530                 test_result["result"] = dict()
531                 test_result["result"]["value"] = int(groups.group(2))
532                 test_result["result"]["unit"] = groups.group(1)
533
534             elif test_type in ("MRR", "BMRR"):
535                 test_result["result"] = dict()
536                 groups = re.search(self.REGEX_BMRR, test.message)
537                 if groups is not None:
538                     items_str = groups.group(1)
539                     items_float = [float(item.strip()) for item
540                                    in items_str.split(",")]
541                     test_result["result"]["receive-rate"] = \
542                         AvgStdevMetadataFactory.from_data(items_float)
543                 else:
544                     groups = re.search(self.REGEX_MRR, test.message)
545                     test_result["result"]["receive-rate"] = \
546                         AvgStdevMetadataFactory.from_data([
547                             float(groups.group(3)) / float(groups.group(1)), ])
548
549         self._test_ID = test.longname.lower()
550         self._data["tests"][self._test_ID] = test_result
551
552     def end_test(self, test):
553         """Called when test ends.
554
555         :param test: Test to process.
556         :type test: Test
557         :returns: Nothing.
558         """
559         pass
560
561     def visit_keyword(self, keyword):
562         """Implements traversing through the keyword and its child keywords.
563
564         :param keyword: Keyword to process.
565         :type keyword: Keyword
566         :returns: Nothing.
567         """
568         if self.start_keyword(keyword) is not False:
569             self.end_keyword(keyword)
570
571     def start_keyword(self, keyword):
572         """Called when keyword starts. Default implementation does nothing.
573
574         :param keyword: Keyword to process.
575         :type keyword: Keyword
576         :returns: Nothing.
577         """
578         try:
579             if keyword.type == "setup":
580                 self.visit_setup_kw(keyword)
581             elif keyword.type == "teardown":
582                 self._lookup_kw_nr = 0
583                 self.visit_teardown_kw(keyword)
584             else:
585                 self._lookup_kw_nr = 0
586                 self.visit_test_kw(keyword)
587         except AttributeError:
588             pass
589
590     def end_keyword(self, keyword):
591         """Called when keyword ends. Default implementation does nothing.
592
593         :param keyword: Keyword to process.
594         :type keyword: Keyword
595         :returns: Nothing.
596         """
597         pass
598
599     def visit_test_kw(self, test_kw):
600         """Implements traversing through the test keyword and its child
601         keywords.
602
603         :param test_kw: Keyword to process.
604         :type test_kw: Keyword
605         :returns: Nothing.
606         """
607         for keyword in test_kw.keywords:
608             if self.start_test_kw(keyword) is not False:
609                 self.visit_test_kw(keyword)
610                 self.end_test_kw(keyword)
611
612     def start_test_kw(self, test_kw):
613         """Called when test keyword starts. Default implementation does
614         nothing.
615
616         :param test_kw: Keyword to process.
617         :type test_kw: Keyword
618         :returns: Nothing.
619         """
620         if test_kw.name.count("Show Runtime Counters On All Duts"):
621             self._lookup_kw_nr += 1
622             self._show_run_lookup_nr = 0
623             self._msg_type = "test-show-runtime"
624         elif test_kw.name.count("Start The L2fwd Test") and not self._version:
625             self._msg_type = "dpdk-version"
626         else:
627             return
628         test_kw.messages.visit(self)
629
630     def end_test_kw(self, test_kw):
631         """Called when keyword ends. Default implementation does nothing.
632
633         :param test_kw: Keyword to process.
634         :type test_kw: Keyword
635         :returns: Nothing.
636         """
637         pass
638
639     def visit_setup_kw(self, setup_kw):
640         """Implements traversing through the teardown keyword and its child
641         keywords.
642
643         :param setup_kw: Keyword to process.
644         :type setup_kw: Keyword
645         :returns: Nothing.
646         """
647         for keyword in setup_kw.keywords:
648             if self.start_setup_kw(keyword) is not False:
649                 self.visit_setup_kw(keyword)
650                 self.end_setup_kw(keyword)
651
652     def start_setup_kw(self, setup_kw):
653         """Called when teardown keyword starts. Default implementation does
654         nothing.
655
656         :param setup_kw: Keyword to process.
657         :type setup_kw: Keyword
658         :returns: Nothing.
659         """
660         if setup_kw.name.count("Show Vpp Version On All Duts") \
661                 and not self._version:
662             self._msg_type = "vpp-version"
663
664         elif setup_kw.name.count("Setup performance global Variables") \
665                 and not self._timestamp:
666             self._msg_type = "timestamp"
667         else:
668             return
669         setup_kw.messages.visit(self)
670
671     def end_setup_kw(self, setup_kw):
672         """Called when keyword ends. Default implementation does nothing.
673
674         :param setup_kw: Keyword to process.
675         :type setup_kw: Keyword
676         :returns: Nothing.
677         """
678         pass
679
680     def visit_teardown_kw(self, teardown_kw):
681         """Implements traversing through the teardown keyword and its child
682         keywords.
683
684         :param teardown_kw: Keyword to process.
685         :type teardown_kw: Keyword
686         :returns: Nothing.
687         """
688         for keyword in teardown_kw.keywords:
689             if self.start_teardown_kw(keyword) is not False:
690                 self.visit_teardown_kw(keyword)
691                 self.end_teardown_kw(keyword)
692
693     def start_teardown_kw(self, teardown_kw):
694         """Called when teardown keyword starts. Default implementation does
695         nothing.
696
697         :param teardown_kw: Keyword to process.
698         :type teardown_kw: Keyword
699         :returns: Nothing.
700         """
701
702         if teardown_kw.name.count("Show Vat History On All Duts"):
703             self._vat_history_lookup_nr = 0
704             self._msg_type = "teardown-vat-history"
705             teardown_kw.messages.visit(self)
706
707     def end_teardown_kw(self, teardown_kw):
708         """Called when keyword ends. Default implementation does nothing.
709
710         :param teardown_kw: Keyword to process.
711         :type teardown_kw: Keyword
712         :returns: Nothing.
713         """
714         pass
715
716     def visit_message(self, msg):
717         """Implements visiting the message.
718
719         :param msg: Message to process.
720         :type msg: Message
721         :returns: Nothing.
722         """
723         if self.start_message(msg) is not False:
724             self.end_message(msg)
725
726     def start_message(self, msg):
727         """Called when message starts. Get required information from messages:
728         - VPP version.
729
730         :param msg: Message to process.
731         :type msg: Message
732         :returns: Nothing.
733         """
734
735         if self._msg_type:
736             self.parse_msg[self._msg_type](msg)
737
738     def end_message(self, msg):
739         """Called when message ends. Default implementation does nothing.
740
741         :param msg: Message to process.
742         :type msg: Message
743         :returns: Nothing.
744         """
745         pass
746
747
748 class InputData(object):
749     """Input data
750
751     The data is extracted from output.xml files generated by Jenkins jobs and
752     stored in pandas' DataFrames.
753
754     The data structure:
755     - job name
756       - build number
757         - metadata
758           (as described in ExecutionChecker documentation)
759         - suites
760           (as described in ExecutionChecker documentation)
761         - tests
762           (as described in ExecutionChecker documentation)
763     """
764
765     def __init__(self, spec):
766         """Initialization.
767
768         :param spec: Specification.
769         :type spec: Specification
770         """
771
772         # Specification:
773         self._cfg = spec
774
775         # Data store:
776         self._input_data = pd.Series()
777
778     @property
779     def data(self):
780         """Getter - Input data.
781
782         :returns: Input data
783         :rtype: pandas.Series
784         """
785         return self._input_data
786
787     def metadata(self, job, build):
788         """Getter - metadata
789
790         :param job: Job which metadata we want.
791         :param build: Build which metadata we want.
792         :type job: str
793         :type build: str
794         :returns: Metadata
795         :rtype: pandas.Series
796         """
797
798         return self.data[job][build]["metadata"]
799
800     def suites(self, job, build):
801         """Getter - suites
802
803         :param job: Job which suites we want.
804         :param build: Build which suites we want.
805         :type job: str
806         :type build: str
807         :returns: Suites.
808         :rtype: pandas.Series
809         """
810
811         return self.data[job][str(build)]["suites"]
812
813     def tests(self, job, build):
814         """Getter - tests
815
816         :param job: Job which tests we want.
817         :param build: Build which tests we want.
818         :type job: str
819         :type build: str
820         :returns: Tests.
821         :rtype: pandas.Series
822         """
823
824         return self.data[job][build]["tests"]
825
826     @staticmethod
827     def _parse_tests(job, build, log):
828         """Process data from robot output.xml file and return JSON structured
829         data.
830
831         :param job: The name of job which build output data will be processed.
832         :param build: The build which output data will be processed.
833         :param log: List of log messages.
834         :type job: str
835         :type build: dict
836         :type log: list of tuples (severity, msg)
837         :returns: JSON data structure.
838         :rtype: dict
839         """
840
841         metadata = {
842             "job": job,
843             "build": build
844         }
845
846         with open(build["file-name"], 'r') as data_file:
847             try:
848                 result = ExecutionResult(data_file)
849             except errors.DataError as err:
850                 log.append(("ERROR", "Error occurred while parsing output.xml: "
851                                      "{0}".format(err)))
852                 return None
853         checker = ExecutionChecker(metadata)
854         result.visit(checker)
855
856         return checker.data
857
858     def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
859         """Download and parse the input data file.
860
861         :param pid: PID of the process executing this method.
862         :param data_queue: Shared memory between processes. Queue which keeps
863             the result data. This data is then read by the main process and used
864             in further processing.
865         :param job: Name of the Jenkins job which generated the processed input
866             file.
867         :param build: Information about the Jenkins build which generated the
868             processed input file.
869         :param repeat: Repeat the download specified number of times if not
870             successful.
871         :type pid: int
872         :type data_queue: multiprocessing.Manager().Queue()
873         :type job: str
874         :type build: dict
875         :type repeat: int
876         """
877
878         logs = list()
879
880         logging.info("  Processing the job/build: {0}: {1}".
881                      format(job, build["build"]))
882
883         logs.append(("INFO", "  Processing the job/build: {0}: {1}".
884                      format(job, build["build"])))
885
886         state = "failed"
887         success = False
888         data = None
889         do_repeat = repeat
890         while do_repeat:
891             success = download_and_unzip_data_file(self._cfg, job, build, pid,
892                                                    logs)
893             if success:
894                 break
895             do_repeat -= 1
896         if not success:
897             logs.append(("ERROR", "It is not possible to download the input "
898                                   "data file from the job '{job}', build "
899                                   "'{build}', or it is damaged. Skipped.".
900                          format(job=job, build=build["build"])))
901         if success:
902             logs.append(("INFO", "  Processing data from the build '{0}' ...".
903                          format(build["build"])))
904             data = InputData._parse_tests(job, build, logs)
905             if data is None:
906                 logs.append(("ERROR", "Input data file from the job '{job}', "
907                                       "build '{build}' is damaged. Skipped.".
908                              format(job=job, build=build["build"])))
909             else:
910                 state = "processed"
911
912             try:
913                 remove(build["file-name"])
914             except OSError as err:
915                 logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
916                              format(build["file-name"], err)))
917         logs.append(("INFO", "  Done."))
918
919         result = {
920             "data": data,
921             "state": state,
922             "job": job,
923             "build": build,
924             "logs": logs
925         }
926         data_queue.put(result)
927
928     def download_and_parse_data(self, repeat=1):
929         """Download the input data files, parse input data from input files and
930         store in pandas' Series.
931
932         :param repeat: Repeat the download specified number of times if not
933             successful.
934         :type repeat: int
935         """
936
937         logging.info("Downloading and parsing input files ...")
938
939         work_queue = multiprocessing.JoinableQueue()
940         manager = multiprocessing.Manager()
941         data_queue = manager.Queue()
942         cpus = multiprocessing.cpu_count()
943
944         workers = list()
945         for cpu in range(cpus):
946             worker = Worker(work_queue,
947                             data_queue,
948                             self._download_and_parse_build)
949             worker.daemon = True
950             worker.start()
951             workers.append(worker)
952             os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
953                       format(cpu, worker.pid))
954
955         for job, builds in self._cfg.builds.items():
956             for build in builds:
957                 work_queue.put((job, build, repeat))
958
959         work_queue.join()
960
961         logging.info("Done.")
962
963         while not data_queue.empty():
964             result = data_queue.get()
965
966             job = result["job"]
967             build_nr = result["build"]["build"]
968
969             if result["data"]:
970                 data = result["data"]
971                 build_data = pd.Series({
972                     "metadata": pd.Series(data["metadata"].values(),
973                                           index=data["metadata"].keys()),
974                     "suites": pd.Series(data["suites"].values(),
975                                         index=data["suites"].keys()),
976                     "tests": pd.Series(data["tests"].values(),
977                                        index=data["tests"].keys())})
978
979                 if self._input_data.get(job, None) is None:
980                     self._input_data[job] = pd.Series()
981                 self._input_data[job][str(build_nr)] = build_data
982
983                 self._cfg.set_input_file_name(job, build_nr,
984                                               result["build"]["file-name"])
985
986             self._cfg.set_input_state(job, build_nr, result["state"])
987
988             for item in result["logs"]:
989                 if item[0] == "INFO":
990                     logging.info(item[1])
991                 elif item[0] == "ERROR":
992                     logging.error(item[1])
993                 elif item[0] == "DEBUG":
994                     logging.debug(item[1])
995                 elif item[0] == "CRITICAL":
996                     logging.critical(item[1])
997                 elif item[0] == "WARNING":
998                     logging.warning(item[1])
999
1000         del data_queue
1001
1002         # Terminate all workers
1003         for worker in workers:
1004             worker.terminate()
1005             worker.join()
1006
1007         logging.info("Done.")
1008
1009     @staticmethod
1010     def _end_of_tag(tag_filter, start=0, closer="'"):
1011         """Return the index of character in the string which is the end of tag.
1012
1013         :param tag_filter: The string where the end of tag is being searched.
1014         :param start: The index where the searching is stated.
1015         :param closer: The character which is the tag closer.
1016         :type tag_filter: str
1017         :type start: int
1018         :type closer: str
1019         :returns: The index of the tag closer.
1020         :rtype: int
1021         """
1022
1023         try:
1024             idx_opener = tag_filter.index(closer, start)
1025             return tag_filter.index(closer, idx_opener + 1)
1026         except ValueError:
1027             return None
1028
1029     @staticmethod
1030     def _condition(tag_filter):
1031         """Create a conditional statement from the given tag filter.
1032
1033         :param tag_filter: Filter based on tags from the element specification.
1034         :type tag_filter: str
1035         :returns: Conditional statement which can be evaluated.
1036         :rtype: str
1037         """
1038
1039         index = 0
1040         while True:
1041             index = InputData._end_of_tag(tag_filter, index)
1042             if index is None:
1043                 return tag_filter
1044             index += 1
1045             tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
1046
1047     def filter_data(self, element, params=None, data_set="tests",
1048                     continue_on_error=False):
1049         """Filter required data from the given jobs and builds.
1050
1051         The output data structure is:
1052
1053         - job 1
1054           - build 1
1055             - test (or suite) 1 ID:
1056               - param 1
1057               - param 2
1058               ...
1059               - param n
1060             ...
1061             - test (or suite) n ID:
1062             ...
1063           ...
1064           - build n
1065         ...
1066         - job n
1067
1068         :param element: Element which will use the filtered data.
1069         :param params: Parameters which will be included in the output. If None,
1070         all parameters are included.
1071         :param data_set: The set of data to be filtered: tests, suites,
1072         metadata.
1073         :param continue_on_error: Continue if there is error while reading the
1074         data. The Item will be empty then
1075         :type element: pandas.Series
1076         :type params: list
1077         :type data_set: str
1078         :type continue_on_error: bool
1079         :returns: Filtered data.
1080         :rtype pandas.Series
1081         """
1082
1083         try:
1084             if element["filter"] in ("all", "template"):
1085                 cond = "True"
1086             else:
1087                 cond = InputData._condition(element["filter"])
1088             logging.debug("   Filter: {0}".format(cond))
1089         except KeyError:
1090             logging.error("  No filter defined.")
1091             return None
1092
1093         if params is None:
1094             params = element.get("parameters", None)
1095
1096         data = pd.Series()
1097         try:
1098             for job, builds in element["data"].items():
1099                 data[job] = pd.Series()
1100                 for build in builds:
1101                     data[job][str(build)] = pd.Series()
1102                     try:
1103                         data_iter = self.data[job][str(build)][data_set].\
1104                             iteritems()
1105                     except KeyError:
1106                         if continue_on_error:
1107                             continue
1108                         else:
1109                             return None
1110                     for test_ID, test_data in data_iter:
1111                         if eval(cond, {"tags": test_data.get("tags", "")}):
1112                             data[job][str(build)][test_ID] = pd.Series()
1113                             if params is None:
1114                                 for param, val in test_data.items():
1115                                     data[job][str(build)][test_ID][param] = val
1116                             else:
1117                                 for param in params:
1118                                     try:
1119                                         data[job][str(build)][test_ID][param] =\
1120                                             test_data[param]
1121                                     except KeyError:
1122                                         data[job][str(build)][test_ID][param] =\
1123                                             "No Data"
1124             return data
1125
1126         except (KeyError, IndexError, ValueError) as err:
1127             logging.error("   Missing mandatory parameter in the element "
1128                           "specification: {0}".format(err))
1129             return None
1130         except AttributeError:
1131             return None
1132         except SyntaxError:
1133             logging.error("   The filter '{0}' is not correct. Check if all "
1134                           "tags are enclosed by apostrophes.".format(cond))
1135             return None
1136
1137     @staticmethod
1138     def merge_data(data):
1139         """Merge data from more jobs and builds to a simple data structure.
1140
1141         The output data structure is:
1142
1143         - test (suite) 1 ID:
1144           - param 1
1145           - param 2
1146           ...
1147           - param n
1148         ...
1149         - test (suite) n ID:
1150         ...
1151
1152         :param data: Data to merge.
1153         :type data: pandas.Series
1154         :returns: Merged data.
1155         :rtype: pandas.Series
1156         """
1157
1158         logging.info("    Merging data ...")
1159
1160         merged_data = pd.Series()
1161         for _, builds in data.iteritems():
1162             for _, item in builds.iteritems():
1163                 for ID, item_data in item.iteritems():
1164                     merged_data[ID] = item_data
1165
1166         return merged_data