CSIT-1101: Optimize input data processing
[csit.git] / resources / tools / presentation / input_data_parser.py
1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Data pre-processing
15
16 - extract data from output.xml files generated by Jenkins jobs and store in
17   pandas' Series,
18 - provide access to the data.
19 """
20
21 import multiprocessing
22 import os
23 import re
24 import pandas as pd
25 import logging
26
27 from robot.api import ExecutionResult, ResultVisitor
28 from robot import errors
29 from collections import OrderedDict
30 from string import replace
31 from os import remove
32
33 from input_data_files import download_and_unzip_data_file
34
35
36 class ExecutionChecker(ResultVisitor):
37     """Class to traverse through the test suite structure.
38
39     The functionality implemented in this class generates a json structure:
40
41     Performance tests:
42
43     {
44         "metadata": {  # Optional
45             "version": "VPP version",
46             "job": "Jenkins job name",
47             "build": "Information about the build"
48         },
49         "suites": {
50             "Suite name 1": {
51                 "doc": "Suite 1 documentation",
52                 "parent": "Suite 1 parent",
53                 "level": "Level of the suite in the suite hierarchy"
54             }
55             "Suite name N": {
56                 "doc": "Suite N documentation",
57                 "parent": "Suite 2 parent",
58                 "level": "Level of the suite in the suite hierarchy"
59             }
60         }
61         "tests": {
62             "ID": {
63                 "name": "Test name",
64                 "parent": "Name of the parent of the test",
65                 "doc": "Test documentation"
66                 "msg": "Test message"
67                 "tags": ["tag 1", "tag 2", "tag n"],
68                 "type": "PDR" | "NDR",
69                 "throughput": {
70                     "value": int,
71                     "unit": "pps" | "bps" | "percentage"
72                 },
73                 "latency": {
74                     "direction1": {
75                         "100": {
76                             "min": int,
77                             "avg": int,
78                             "max": int
79                         },
80                         "50": {  # Only for NDR
81                             "min": int,
82                             "avg": int,
83                             "max": int
84                         },
85                         "10": {  # Only for NDR
86                             "min": int,
87                             "avg": int,
88                             "max": int
89                         }
90                     },
91                     "direction2": {
92                         "100": {
93                             "min": int,
94                             "avg": int,
95                             "max": int
96                         },
97                         "50": {  # Only for NDR
98                             "min": int,
99                             "avg": int,
100                             "max": int
101                         },
102                         "10": {  # Only for NDR
103                             "min": int,
104                             "avg": int,
105                             "max": int
106                         }
107                     }
108                 },
109                 "lossTolerance": "lossTolerance",  # Only for PDR
110                 "vat-history": "DUT1 and DUT2 VAT History"
111                 },
112                 "show-run": "Show Run"
113             },
114             "ID" {
115                 # next test
116             }
117         }
118     }
119
120     Functional tests:
121
122
123     {
124         "metadata": {  # Optional
125             "version": "VPP version",
126             "job": "Jenkins job name",
127             "build": "Information about the build"
128         },
129         "suites": {
130             "Suite name 1": {
131                 "doc": "Suite 1 documentation",
132                 "parent": "Suite 1 parent",
133                 "level": "Level of the suite in the suite hierarchy"
134             }
135             "Suite name N": {
136                 "doc": "Suite N documentation",
137                 "parent": "Suite 2 parent",
138                 "level": "Level of the suite in the suite hierarchy"
139             }
140         }
141         "tests": {
142             "ID": {
143                 "name": "Test name",
144                 "parent": "Name of the parent of the test",
145                 "doc": "Test documentation"
146                 "msg": "Test message"
147                 "tags": ["tag 1", "tag 2", "tag n"],
148                 "vat-history": "DUT1 and DUT2 VAT History"
149                 "show-run": "Show Run"
150                 "status": "PASS" | "FAIL"
151             },
152             "ID" {
153                 # next test
154             }
155         }
156     }
157
158     .. note:: ID is the lowercase full path to the test.
159     """
160
161     REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
162
163     REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
164                                r'LAT_\d+%NDR:\s\[\'(-?\d+\/-?\d+/-?\d+)\','
165                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
166                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
167                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
168                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
169                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
170
171     REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
172                                r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
173                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
174
175     REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
176                                  r'[\D\d]*')
177
178     REGEX_VERSION = re.compile(r"(return STDOUT Version:\s*)(.*)")
179
180     REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
181
182     REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
183                            r'tx\s(\d*),\srx\s(\d*)')
184
185     def __init__(self, metadata):
186         """Initialisation.
187
188         :param metadata: Key-value pairs to be included in "metadata" part of
189         JSON structure.
190         :type metadata: dict
191         """
192
193         # Type of message to parse out from the test messages
194         self._msg_type = None
195
196         # VPP version
197         self._version = None
198
199         # Number of VAT History messages found:
200         # 0 - no message
201         # 1 - VAT History of DUT1
202         # 2 - VAT History of DUT2
203         self._lookup_kw_nr = 0
204         self._vat_history_lookup_nr = 0
205
206         # Number of Show Running messages found
207         # 0 - no message
208         # 1 - Show run message found
209         self._show_run_lookup_nr = 0
210
211         # Test ID of currently processed test- the lowercase full path to the
212         # test
213         self._test_ID = None
214
215         # The main data structure
216         self._data = {
217             "metadata": OrderedDict(),
218             "suites": OrderedDict(),
219             "tests": OrderedDict()
220         }
221
222         # Save the provided metadata
223         for key, val in metadata.items():
224             self._data["metadata"][key] = val
225
226         # Dictionary defining the methods used to parse different types of
227         # messages
228         self.parse_msg = {
229             "setup-version": self._get_version,
230             "teardown-vat-history": self._get_vat_history,
231             "test-show-runtime": self._get_show_run
232         }
233
234     @property
235     def data(self):
236         """Getter - Data parsed from the XML file.
237
238         :returns: Data parsed from the XML file.
239         :rtype: dict
240         """
241         return self._data
242
243     def _get_version(self, msg):
244         """Called when extraction of VPP version is required.
245
246         :param msg: Message to process.
247         :type msg: Message
248         :returns: Nothing.
249         """
250
251         if msg.message.count("return STDOUT Version:"):
252             self._version = str(re.search(self.REGEX_VERSION, msg.message).
253                                 group(2))
254             self._data["metadata"]["version"] = self._version
255             self._data["metadata"]["generated"] = msg.timestamp
256             self._msg_type = None
257
258     def _get_vat_history(self, msg):
259         """Called when extraction of VAT command history is required.
260
261         :param msg: Message to process.
262         :type msg: Message
263         :returns: Nothing.
264         """
265         if msg.message.count("VAT command history:"):
266             self._vat_history_lookup_nr += 1
267             if self._vat_history_lookup_nr == 1:
268                 self._data["tests"][self._test_ID]["vat-history"] = str()
269             else:
270                 self._msg_type = None
271             text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
272                           "VAT command history:", "", msg.message, count=1). \
273                 replace("\n\n", "\n").replace('\n', ' |br| ').\
274                 replace('\r', '').replace('"', "'")
275
276             self._data["tests"][self._test_ID]["vat-history"] += " |br| "
277             self._data["tests"][self._test_ID]["vat-history"] += \
278                 "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
279
280     def _get_show_run(self, msg):
281         """Called when extraction of VPP operational data (output of CLI command
282         Show Runtime) is required.
283
284         :param msg: Message to process.
285         :type msg: Message
286         :returns: Nothing.
287         """
288         if msg.message.count("return STDOUT Thread "):
289             self._show_run_lookup_nr += 1
290             if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
291                 self._data["tests"][self._test_ID]["show-run"] = str()
292             if self._lookup_kw_nr > 1:
293                 self._msg_type = None
294             if self._show_run_lookup_nr == 1:
295                 text = msg.message.replace("vat# ", "").\
296                     replace("return STDOUT ", "").replace("\n\n", "\n").\
297                     replace('\n', ' |br| ').\
298                     replace('\r', '').replace('"', "'")
299                 try:
300                     self._data["tests"][self._test_ID]["show-run"] += " |br| "
301                     self._data["tests"][self._test_ID]["show-run"] += \
302                         "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
303                 except KeyError:
304                     pass
305
306     def _get_latency(self, msg, test_type):
307         """Get the latency data from the test message.
308
309         :param msg: Message to be parsed.
310         :param test_type: Type of the test - NDR or PDR.
311         :type msg: str
312         :type test_type: str
313         :returns: Latencies parsed from the message.
314         :rtype: dict
315         """
316
317         if test_type == "NDR":
318             groups = re.search(self.REGEX_LAT_NDR, msg)
319             groups_range = range(1, 7)
320         elif test_type == "PDR":
321             groups = re.search(self.REGEX_LAT_PDR, msg)
322             groups_range = range(1, 3)
323         else:
324             return {}
325
326         latencies = list()
327         for idx in groups_range:
328             try:
329                 lat = [int(item) for item in str(groups.group(idx)).split('/')]
330             except (AttributeError, ValueError):
331                 lat = [-1, -1, -1]
332             latencies.append(lat)
333
334         keys = ("min", "avg", "max")
335         latency = {
336             "direction1": {
337             },
338             "direction2": {
339             }
340         }
341
342         latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
343         latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
344         if test_type == "NDR":
345             latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
346             latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
347             latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
348             latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
349
350         return latency
351
352     def visit_suite(self, suite):
353         """Implements traversing through the suite and its direct children.
354
355         :param suite: Suite to process.
356         :type suite: Suite
357         :returns: Nothing.
358         """
359         if self.start_suite(suite) is not False:
360             suite.suites.visit(self)
361             suite.tests.visit(self)
362             self.end_suite(suite)
363
364     def start_suite(self, suite):
365         """Called when suite starts.
366
367         :param suite: Suite to process.
368         :type suite: Suite
369         :returns: Nothing.
370         """
371
372         try:
373             parent_name = suite.parent.name
374         except AttributeError:
375             return
376
377         doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
378             replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
379         doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
380
381         self._data["suites"][suite.longname.lower().replace('"', "'").
382             replace(" ", "_")] = {
383                 "name": suite.name.lower(),
384                 "doc": doc_str,
385                 "parent": parent_name,
386                 "level": len(suite.longname.split("."))
387             }
388
389         suite.keywords.visit(self)
390
391     def end_suite(self, suite):
392         """Called when suite ends.
393
394         :param suite: Suite to process.
395         :type suite: Suite
396         :returns: Nothing.
397         """
398         pass
399
400     def visit_test(self, test):
401         """Implements traversing through the test.
402
403         :param test: Test to process.
404         :type test: Test
405         :returns: Nothing.
406         """
407         if self.start_test(test) is not False:
408             test.keywords.visit(self)
409             self.end_test(test)
410
411     def start_test(self, test):
412         """Called when test starts.
413
414         :param test: Test to process.
415         :type test: Test
416         :returns: Nothing.
417         """
418
419         tags = [str(tag) for tag in test.tags]
420         test_result = dict()
421         test_result["name"] = test.name.lower()
422         test_result["parent"] = test.parent.name.lower()
423         test_result["tags"] = tags
424         doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
425             replace('\r', '').replace('[', ' |br| [')
426         test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
427         test_result["msg"] = test.message.replace('\n', ' |br| '). \
428             replace('\r', '').replace('"', "'")
429         if test.status == "PASS" and ("NDRPDRDISC" in tags or
430                                       "TCP" in tags or
431                                       "MRR" in tags):
432             if "NDRDISC" in tags:
433                 test_type = "NDR"
434             elif "PDRDISC" in tags:
435                 test_type = "PDR"
436             elif "TCP" in tags:
437                 test_type = "TCP"
438             elif "MRR" in tags:
439                 test_type = "MRR"
440             else:
441                 return
442
443             test_result["type"] = test_type
444
445             if test_type in ("NDR", "PDR"):
446                 try:
447                     rate_value = str(re.search(
448                         self.REGEX_RATE, test.message).group(1))
449                 except AttributeError:
450                     rate_value = "-1"
451                 try:
452                     rate_unit = str(re.search(
453                         self.REGEX_RATE, test.message).group(2))
454                 except AttributeError:
455                     rate_unit = "-1"
456
457                 test_result["throughput"] = dict()
458                 test_result["throughput"]["value"] = \
459                     int(rate_value.split('.')[0])
460                 test_result["throughput"]["unit"] = rate_unit
461                 test_result["latency"] = \
462                     self._get_latency(test.message, test_type)
463                 if test_type == "PDR":
464                     test_result["lossTolerance"] = str(re.search(
465                         self.REGEX_TOLERANCE, test.message).group(1))
466
467             elif test_type in ("TCP", ):
468                 groups = re.search(self.REGEX_TCP, test.message)
469                 test_result["result"] = dict()
470                 test_result["result"]["value"] = int(groups.group(2))
471                 test_result["result"]["unit"] = groups.group(1)
472             elif test_type in ("MRR", ):
473                 groups = re.search(self.REGEX_MRR, test.message)
474                 test_result["result"] = dict()
475                 test_result["result"]["duration"] = int(groups.group(1))
476                 test_result["result"]["tx"] = int(groups.group(2))
477                 test_result["result"]["rx"] = int(groups.group(3))
478                 test_result["result"]["throughput"] = int(
479                     test_result["result"]["rx"] /
480                     test_result["result"]["duration"])
481         else:
482             test_result["status"] = test.status
483
484         self._test_ID = test.longname.lower()
485         self._data["tests"][self._test_ID] = test_result
486
487     def end_test(self, test):
488         """Called when test ends.
489
490         :param test: Test to process.
491         :type test: Test
492         :returns: Nothing.
493         """
494         pass
495
496     def visit_keyword(self, keyword):
497         """Implements traversing through the keyword and its child keywords.
498
499         :param keyword: Keyword to process.
500         :type keyword: Keyword
501         :returns: Nothing.
502         """
503         if self.start_keyword(keyword) is not False:
504             self.end_keyword(keyword)
505
506     def start_keyword(self, keyword):
507         """Called when keyword starts. Default implementation does nothing.
508
509         :param keyword: Keyword to process.
510         :type keyword: Keyword
511         :returns: Nothing.
512         """
513         try:
514             if keyword.type == "setup":
515                 self.visit_setup_kw(keyword)
516             elif keyword.type == "teardown":
517                 self._lookup_kw_nr = 0
518                 self.visit_teardown_kw(keyword)
519             else:
520                 self._lookup_kw_nr = 0
521                 self.visit_test_kw(keyword)
522         except AttributeError:
523             pass
524
525     def end_keyword(self, keyword):
526         """Called when keyword ends. Default implementation does nothing.
527
528         :param keyword: Keyword to process.
529         :type keyword: Keyword
530         :returns: Nothing.
531         """
532         pass
533
534     def visit_test_kw(self, test_kw):
535         """Implements traversing through the test keyword and its child
536         keywords.
537
538         :param test_kw: Keyword to process.
539         :type test_kw: Keyword
540         :returns: Nothing.
541         """
542         for keyword in test_kw.keywords:
543             if self.start_test_kw(keyword) is not False:
544                 self.visit_test_kw(keyword)
545                 self.end_test_kw(keyword)
546
547     def start_test_kw(self, test_kw):
548         """Called when test keyword starts. Default implementation does
549         nothing.
550
551         :param test_kw: Keyword to process.
552         :type test_kw: Keyword
553         :returns: Nothing.
554         """
555         if test_kw.name.count("Show Runtime Counters On All Duts"):
556             self._lookup_kw_nr += 1
557             self._show_run_lookup_nr = 0
558             self._msg_type = "test-show-runtime"
559             test_kw.messages.visit(self)
560
561     def end_test_kw(self, test_kw):
562         """Called when keyword ends. Default implementation does nothing.
563
564         :param test_kw: Keyword to process.
565         :type test_kw: Keyword
566         :returns: Nothing.
567         """
568         pass
569
570     def visit_setup_kw(self, setup_kw):
571         """Implements traversing through the teardown keyword and its child
572         keywords.
573
574         :param setup_kw: Keyword to process.
575         :type setup_kw: Keyword
576         :returns: Nothing.
577         """
578         for keyword in setup_kw.keywords:
579             if self.start_setup_kw(keyword) is not False:
580                 self.visit_setup_kw(keyword)
581                 self.end_setup_kw(keyword)
582
583     def start_setup_kw(self, setup_kw):
584         """Called when teardown keyword starts. Default implementation does
585         nothing.
586
587         :param setup_kw: Keyword to process.
588         :type setup_kw: Keyword
589         :returns: Nothing.
590         """
591         if setup_kw.name.count("Show Vpp Version On All Duts") \
592                 and not self._version:
593             self._msg_type = "setup-version"
594             setup_kw.messages.visit(self)
595
596     def end_setup_kw(self, setup_kw):
597         """Called when keyword ends. Default implementation does nothing.
598
599         :param setup_kw: Keyword to process.
600         :type setup_kw: Keyword
601         :returns: Nothing.
602         """
603         pass
604
605     def visit_teardown_kw(self, teardown_kw):
606         """Implements traversing through the teardown keyword and its child
607         keywords.
608
609         :param teardown_kw: Keyword to process.
610         :type teardown_kw: Keyword
611         :returns: Nothing.
612         """
613         for keyword in teardown_kw.keywords:
614             if self.start_teardown_kw(keyword) is not False:
615                 self.visit_teardown_kw(keyword)
616                 self.end_teardown_kw(keyword)
617
618     def start_teardown_kw(self, teardown_kw):
619         """Called when teardown keyword starts. Default implementation does
620         nothing.
621
622         :param teardown_kw: Keyword to process.
623         :type teardown_kw: Keyword
624         :returns: Nothing.
625         """
626
627         if teardown_kw.name.count("Show Vat History On All Duts"):
628             self._vat_history_lookup_nr = 0
629             self._msg_type = "teardown-vat-history"
630             teardown_kw.messages.visit(self)
631
632     def end_teardown_kw(self, teardown_kw):
633         """Called when keyword ends. Default implementation does nothing.
634
635         :param teardown_kw: Keyword to process.
636         :type teardown_kw: Keyword
637         :returns: Nothing.
638         """
639         pass
640
641     def visit_message(self, msg):
642         """Implements visiting the message.
643
644         :param msg: Message to process.
645         :type msg: Message
646         :returns: Nothing.
647         """
648         if self.start_message(msg) is not False:
649             self.end_message(msg)
650
651     def start_message(self, msg):
652         """Called when message starts. Get required information from messages:
653         - VPP version.
654
655         :param msg: Message to process.
656         :type msg: Message
657         :returns: Nothing.
658         """
659
660         if self._msg_type:
661             self.parse_msg[self._msg_type](msg)
662
663     def end_message(self, msg):
664         """Called when message ends. Default implementation does nothing.
665
666         :param msg: Message to process.
667         :type msg: Message
668         :returns: Nothing.
669         """
670         pass
671
672
673 class InputData(object):
674     """Input data
675
676     The data is extracted from output.xml files generated by Jenkins jobs and
677     stored in pandas' DataFrames.
678
679     The data structure:
680     - job name
681       - build number
682         - metadata
683           - job
684           - build
685           - vpp version
686         - suites
687         - tests
688           - ID: test data (as described in ExecutionChecker documentation)
689     """
690
691     def __init__(self, spec):
692         """Initialization.
693
694         :param spec: Specification.
695         :type spec: Specification
696         """
697
698         # Specification:
699         self._cfg = spec
700
701         # Data store:
702         self._input_data = pd.Series()
703
704     @property
705     def data(self):
706         """Getter - Input data.
707
708         :returns: Input data
709         :rtype: pandas.Series
710         """
711         return self._input_data
712
713     def metadata(self, job, build):
714         """Getter - metadata
715
716         :param job: Job which metadata we want.
717         :param build: Build which metadata we want.
718         :type job: str
719         :type build: str
720         :returns: Metadata
721         :rtype: pandas.Series
722         """
723
724         return self.data[job][build]["metadata"]
725
726     def suites(self, job, build):
727         """Getter - suites
728
729         :param job: Job which suites we want.
730         :param build: Build which suites we want.
731         :type job: str
732         :type build: str
733         :returns: Suites.
734         :rtype: pandas.Series
735         """
736
737         return self.data[job][str(build)]["suites"]
738
739     def tests(self, job, build):
740         """Getter - tests
741
742         :param job: Job which tests we want.
743         :param build: Build which tests we want.
744         :type job: str
745         :type build: str
746         :returns: Tests.
747         :rtype: pandas.Series
748         """
749
750         return self.data[job][build]["tests"]
751
752     @staticmethod
753     def _parse_tests(job, build, log):
754         """Process data from robot output.xml file and return JSON structured
755         data.
756
757         :param job: The name of job which build output data will be processed.
758         :param build: The build which output data will be processed.
759         :param log: List of log messages.
760         :type job: str
761         :type build: dict
762         :type log: list of tuples (severity, msg)
763         :returns: JSON data structure.
764         :rtype: dict
765         """
766
767         metadata = {
768             "job": job,
769             "build": build
770         }
771
772         with open(build["file-name"], 'r') as data_file:
773             try:
774                 result = ExecutionResult(data_file)
775             except errors.DataError as err:
776                 log.append(("ERROR", "Error occurred while parsing output.xml: "
777                                      "{0}".format(err)))
778                 return None
779         checker = ExecutionChecker(metadata)
780         result.visit(checker)
781
782         return checker.data
783
784     def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
785         """Download and parse the input data file.
786
787         :param pid: PID of the process executing this method.
788         :param data_queue: Shared memory between processes. Queue which keeps
789             the result data. This data is then read by the main process and used
790             in further processing.
791         :param job: Name of the Jenkins job which generated the processed input
792             file.
793         :param build: Information about the Jenkins build which generated the
794             processed input file.
795         :param repeat: Repeat the download specified number of times if not
796             successful.
797         :type pid: int
798         :type data_queue: multiprocessing.Manager().Queue()
799         :type job: str
800         :type build: dict
801         :type repeat: int
802         """
803
804         logs = list()
805
806         logging.info("  Processing the job/build: {0}: {1}".
807                      format(job, build["build"]))
808
809         logs.append(("INFO", "  Processing the job/build: {0}: {1}".
810                      format(job, build["build"])))
811
812         state = "failed"
813         success = False
814         data = None
815         do_repeat = repeat
816         while do_repeat:
817             success = download_and_unzip_data_file(self._cfg, job, build, pid,
818                                                    logs)
819             if success:
820                 break
821             do_repeat -= 1
822         if not success:
823             logs.append(("ERROR", "It is not possible to download the input "
824                                   "data file from the job '{job}', build "
825                                   "'{build}', or it is damaged. Skipped.".
826                          format(job=job, build=build["build"])))
827         if success:
828             logs.append(("INFO", "  Processing data from the build '{0}' ...".
829                          format(build["build"])))
830             data = InputData._parse_tests(job, build, logs)
831             if data is None:
832                 logs.append(("ERROR", "Input data file from the job '{job}', "
833                                       "build '{build}' is damaged. Skipped.".
834                              format(job=job, build=build["build"])))
835             else:
836                 state = "processed"
837
838             try:
839                 remove(build["file-name"])
840             except OSError as err:
841                 logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
842                              format(build["file-name"], err)))
843         logs.append(("INFO", "  Done."))
844
845         result = {
846             "data": data,
847             "state": state,
848             "job": job,
849             "build": build,
850             "logs": logs
851         }
852         data_queue.put(result)
853
854     def download_and_parse_data(self, repeat=1):
855         """Download the input data files, parse input data from input files and
856         store in pandas' Series.
857
858         :param repeat: Repeat the download specified number of times if not
859             successful.
860         :type repeat: int
861         """
862
863         logging.info("Downloading and parsing input files ...")
864
865         work_queue = multiprocessing.JoinableQueue()
866
867         manager = multiprocessing.Manager()
868
869         data_queue = manager.Queue()
870
871         cpus = multiprocessing.cpu_count()
872         workers = list()
873         for cpu in range(cpus):
874             worker = Worker(work_queue,
875                             data_queue,
876                             self._download_and_parse_build)
877             worker.daemon = True
878             worker.start()
879             workers.append(worker)
880             os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
881                       format(cpu, worker.pid))
882
883         for job, builds in self._cfg.builds.items():
884             for build in builds:
885                 work_queue.put((job, build, repeat))
886
887         work_queue.join()
888
889         logging.info("Done.")
890
891         while not data_queue.empty():
892             result = data_queue.get()
893
894             job = result["job"]
895             build_nr = result["build"]["build"]
896
897             if result["data"]:
898                 data = result["data"]
899                 build_data = pd.Series({
900                     "metadata": pd.Series(data["metadata"].values(),
901                                           index=data["metadata"].keys()),
902                     "suites": pd.Series(data["suites"].values(),
903                                         index=data["suites"].keys()),
904                     "tests": pd.Series(data["tests"].values(),
905                                        index=data["tests"].keys())})
906
907                 if self._input_data.get(job, None) is None:
908                     self._input_data[job] = pd.Series()
909                 self._input_data[job][str(build_nr)] = build_data
910
911                 self._cfg.set_input_file_name(job, build_nr,
912                                               result["build"]["file-name"])
913
914             self._cfg.set_input_state(job, build_nr, result["state"])
915
916             for item in result["logs"]:
917                 if item[0] == "INFO":
918                     logging.info(item[1])
919                 elif item[0] == "ERROR":
920                     logging.error(item[1])
921                 elif item[0] == "DEBUG":
922                     logging.debug(item[1])
923                 elif item[0] == "CRITICAL":
924                     logging.critical(item[1])
925                 elif item[0] == "WARNING":
926                     logging.warning(item[1])
927
928         del data_queue
929
930         # Terminate all workers
931         for worker in workers:
932             worker.terminate()
933             worker.join()
934
935         logging.info("Done.")
936
937     @staticmethod
938     def _end_of_tag(tag_filter, start=0, closer="'"):
939         """Return the index of character in the string which is the end of tag.
940
941         :param tag_filter: The string where the end of tag is being searched.
942         :param start: The index where the searching is stated.
943         :param closer: The character which is the tag closer.
944         :type tag_filter: str
945         :type start: int
946         :type closer: str
947         :returns: The index of the tag closer.
948         :rtype: int
949         """
950
951         try:
952             idx_opener = tag_filter.index(closer, start)
953             return tag_filter.index(closer, idx_opener + 1)
954         except ValueError:
955             return None
956
957     @staticmethod
958     def _condition(tag_filter):
959         """Create a conditional statement from the given tag filter.
960
961         :param tag_filter: Filter based on tags from the element specification.
962         :type tag_filter: str
963         :returns: Conditional statement which can be evaluated.
964         :rtype: str
965         """
966
967         index = 0
968         while True:
969             index = InputData._end_of_tag(tag_filter, index)
970             if index is None:
971                 return tag_filter
972             index += 1
973             tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
974
975     def filter_data(self, element, params=None, data_set="tests",
976                     continue_on_error=False):
977         """Filter required data from the given jobs and builds.
978
979         The output data structure is:
980
981         - job 1
982           - build 1
983             - test (suite) 1 ID:
984               - param 1
985               - param 2
986               ...
987               - param n
988             ...
989             - test (suite) n ID:
990             ...
991           ...
992           - build n
993         ...
994         - job n
995
996         :param element: Element which will use the filtered data.
997         :param params: Parameters which will be included in the output. If None,
998         all parameters are included.
999         :param data_set: The set of data to be filtered: tests, suites,
1000         metadata.
1001         :param continue_on_error: Continue if there is error while reading the
1002         data. The Item will be empty then
1003         :type element: pandas.Series
1004         :type params: list
1005         :type data_set: str
1006         :type continue_on_error: bool
1007         :returns: Filtered data.
1008         :rtype pandas.Series
1009         """
1010
1011         logging.info("    Creating the data set for the {0} '{1}'.".
1012                      format(element.get("type", ""), element.get("title", "")))
1013
1014         try:
1015             if element["filter"] in ("all", "template"):
1016                 cond = "True"
1017             else:
1018                 cond = InputData._condition(element["filter"])
1019             logging.debug("   Filter: {0}".format(cond))
1020         except KeyError:
1021             logging.error("  No filter defined.")
1022             return None
1023
1024         if params is None:
1025             params = element.get("parameters", None)
1026
1027         data = pd.Series()
1028         try:
1029             for job, builds in element["data"].items():
1030                 data[job] = pd.Series()
1031                 for build in builds:
1032                     data[job][str(build)] = pd.Series()
1033                     try:
1034                         data_iter = self.data[job][str(build)][data_set].\
1035                             iteritems()
1036                     except KeyError:
1037                         if continue_on_error:
1038                             continue
1039                         else:
1040                             return None
1041                     for test_ID, test_data in data_iter:
1042                         if eval(cond, {"tags": test_data.get("tags", "")}):
1043                             data[job][str(build)][test_ID] = pd.Series()
1044                             if params is None:
1045                                 for param, val in test_data.items():
1046                                     data[job][str(build)][test_ID][param] = val
1047                             else:
1048                                 for param in params:
1049                                     try:
1050                                         data[job][str(build)][test_ID][param] =\
1051                                             test_data[param]
1052                                     except KeyError:
1053                                         data[job][str(build)][test_ID][param] =\
1054                                             "No Data"
1055             return data
1056
1057         except (KeyError, IndexError, ValueError) as err:
1058             logging.error("   Missing mandatory parameter in the element "
1059                           "specification: {0}".format(err))
1060             return None
1061         except AttributeError:
1062             return None
1063         except SyntaxError:
1064             logging.error("   The filter '{0}' is not correct. Check if all "
1065                           "tags are enclosed by apostrophes.".format(cond))
1066             return None
1067
1068     @staticmethod
1069     def merge_data(data):
1070         """Merge data from more jobs and builds to a simple data structure.
1071
1072         The output data structure is:
1073
1074         - test (suite) 1 ID:
1075           - param 1
1076           - param 2
1077           ...
1078           - param n
1079         ...
1080         - test (suite) n ID:
1081         ...
1082
1083         :param data: Data to merge.
1084         :type data: pandas.Series
1085         :returns: Merged data.
1086         :rtype: pandas.Series
1087         """
1088
1089         logging.info("    Merging data ...")
1090
1091         merged_data = pd.Series()
1092         for _, builds in data.iteritems():
1093             for _, item in builds.iteritems():
1094                 for ID, item_data in item.iteritems():
1095                     merged_data[ID] = item_data
1096
1097         return merged_data
1098
1099
1100 class Worker(multiprocessing.Process):
1101     """Worker class used to download and process input files in separate
1102     parallel processes.
1103     """
1104
1105     def __init__(self, work_queue, data_queue, func):
1106         """Initialization.
1107
1108         :param work_queue: Queue with items to process.
1109         :param data_queue: Shared memory between processes. Queue which keeps
1110             the result data. This data is then read by the main process and used
1111             in further processing.
1112         :param func: Function which is executed by the worker.
1113         :type work_queue: multiprocessing.JoinableQueue
1114         :type data_queue: multiprocessing.Manager().Queue()
1115         :type func: Callable object
1116         """
1117         super(Worker, self).__init__()
1118         self._work_queue = work_queue
1119         self._data_queue = data_queue
1120         self._func = func
1121
1122     def run(self):
1123         """Method representing the process's activity.
1124         """
1125
1126         while True:
1127             try:
1128                 self.process(self._work_queue.get())
1129             finally:
1130                 self._work_queue.task_done()
1131
1132     def process(self, item_to_process):
1133         """Method executed by the runner.
1134
1135         :param item_to_process: Data to be processed by the function.
1136         :type item_to_process: tuple
1137         """
1138         self._func(self.pid, self._data_queue, *item_to_process)