CSIT-1104: Trending: Speed-up plots generation
[csit.git] / resources / tools / presentation / input_data_parser.py
1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Data pre-processing
15
16 - extract data from output.xml files generated by Jenkins jobs and store in
17   pandas' Series,
18 - provide access to the data.
19 """
20
21 import multiprocessing
22 import os
23 import re
24 import pandas as pd
25 import logging
26
27 from robot.api import ExecutionResult, ResultVisitor
28 from robot import errors
29 from collections import OrderedDict
30 from string import replace
31 from os import remove
32
33 from input_data_files import download_and_unzip_data_file
34 from utils import Worker
35
36
37 class ExecutionChecker(ResultVisitor):
38     """Class to traverse through the test suite structure.
39
40     The functionality implemented in this class generates a json structure:
41
42     Performance tests:
43
44     {
45         "metadata": {  # Optional
46             "version": "VPP version",
47             "job": "Jenkins job name",
48             "build": "Information about the build"
49         },
50         "suites": {
51             "Suite name 1": {
52                 "doc": "Suite 1 documentation",
53                 "parent": "Suite 1 parent",
54                 "level": "Level of the suite in the suite hierarchy"
55             }
56             "Suite name N": {
57                 "doc": "Suite N documentation",
58                 "parent": "Suite 2 parent",
59                 "level": "Level of the suite in the suite hierarchy"
60             }
61         }
62         "tests": {
63             "ID": {
64                 "name": "Test name",
65                 "parent": "Name of the parent of the test",
66                 "doc": "Test documentation"
67                 "msg": "Test message"
68                 "tags": ["tag 1", "tag 2", "tag n"],
69                 "type": "PDR" | "NDR",
70                 "throughput": {
71                     "value": int,
72                     "unit": "pps" | "bps" | "percentage"
73                 },
74                 "latency": {
75                     "direction1": {
76                         "100": {
77                             "min": int,
78                             "avg": int,
79                             "max": int
80                         },
81                         "50": {  # Only for NDR
82                             "min": int,
83                             "avg": int,
84                             "max": int
85                         },
86                         "10": {  # Only for NDR
87                             "min": int,
88                             "avg": int,
89                             "max": int
90                         }
91                     },
92                     "direction2": {
93                         "100": {
94                             "min": int,
95                             "avg": int,
96                             "max": int
97                         },
98                         "50": {  # Only for NDR
99                             "min": int,
100                             "avg": int,
101                             "max": int
102                         },
103                         "10": {  # Only for NDR
104                             "min": int,
105                             "avg": int,
106                             "max": int
107                         }
108                     }
109                 },
110                 "lossTolerance": "lossTolerance",  # Only for PDR
111                 "vat-history": "DUT1 and DUT2 VAT History"
112                 },
113                 "show-run": "Show Run"
114             },
115             "ID" {
116                 # next test
117             }
118         }
119     }
120
121     Functional tests:
122
123
124     {
125         "metadata": {  # Optional
126             "version": "VPP version",
127             "job": "Jenkins job name",
128             "build": "Information about the build"
129         },
130         "suites": {
131             "Suite name 1": {
132                 "doc": "Suite 1 documentation",
133                 "parent": "Suite 1 parent",
134                 "level": "Level of the suite in the suite hierarchy"
135             }
136             "Suite name N": {
137                 "doc": "Suite N documentation",
138                 "parent": "Suite 2 parent",
139                 "level": "Level of the suite in the suite hierarchy"
140             }
141         }
142         "tests": {
143             "ID": {
144                 "name": "Test name",
145                 "parent": "Name of the parent of the test",
146                 "doc": "Test documentation"
147                 "msg": "Test message"
148                 "tags": ["tag 1", "tag 2", "tag n"],
149                 "vat-history": "DUT1 and DUT2 VAT History"
150                 "show-run": "Show Run"
151                 "status": "PASS" | "FAIL"
152             },
153             "ID" {
154                 # next test
155             }
156         }
157     }
158
159     .. note:: ID is the lowercase full path to the test.
160     """
161
162     REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
163
164     REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
165                                r'LAT_\d+%NDR:\s\[\'(-?\d+\/-?\d+/-?\d+)\','
166                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
167                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
168                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
169                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
170                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
171
172     REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
173                                r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
174                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
175
176     REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
177                                  r'[\D\d]*')
178
179     REGEX_VERSION = re.compile(r"(return STDOUT Version:\s*)(.*)")
180
181     REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
182
183     REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
184                            r'tx\s(\d*),\srx\s(\d*)')
185
186     def __init__(self, metadata):
187         """Initialisation.
188
189         :param metadata: Key-value pairs to be included in "metadata" part of
190         JSON structure.
191         :type metadata: dict
192         """
193
194         # Type of message to parse out from the test messages
195         self._msg_type = None
196
197         # VPP version
198         self._version = None
199
200         # Number of VAT History messages found:
201         # 0 - no message
202         # 1 - VAT History of DUT1
203         # 2 - VAT History of DUT2
204         self._lookup_kw_nr = 0
205         self._vat_history_lookup_nr = 0
206
207         # Number of Show Running messages found
208         # 0 - no message
209         # 1 - Show run message found
210         self._show_run_lookup_nr = 0
211
212         # Test ID of currently processed test- the lowercase full path to the
213         # test
214         self._test_ID = None
215
216         # The main data structure
217         self._data = {
218             "metadata": OrderedDict(),
219             "suites": OrderedDict(),
220             "tests": OrderedDict()
221         }
222
223         # Save the provided metadata
224         for key, val in metadata.items():
225             self._data["metadata"][key] = val
226
227         # Dictionary defining the methods used to parse different types of
228         # messages
229         self.parse_msg = {
230             "setup-version": self._get_version,
231             "teardown-vat-history": self._get_vat_history,
232             "test-show-runtime": self._get_show_run
233         }
234
235     @property
236     def data(self):
237         """Getter - Data parsed from the XML file.
238
239         :returns: Data parsed from the XML file.
240         :rtype: dict
241         """
242         return self._data
243
244     def _get_version(self, msg):
245         """Called when extraction of VPP version is required.
246
247         :param msg: Message to process.
248         :type msg: Message
249         :returns: Nothing.
250         """
251
252         if msg.message.count("return STDOUT Version:"):
253             self._version = str(re.search(self.REGEX_VERSION, msg.message).
254                                 group(2))
255             self._data["metadata"]["version"] = self._version
256             self._data["metadata"]["generated"] = msg.timestamp
257             self._msg_type = None
258
259     def _get_vat_history(self, msg):
260         """Called when extraction of VAT command history is required.
261
262         :param msg: Message to process.
263         :type msg: Message
264         :returns: Nothing.
265         """
266         if msg.message.count("VAT command history:"):
267             self._vat_history_lookup_nr += 1
268             if self._vat_history_lookup_nr == 1:
269                 self._data["tests"][self._test_ID]["vat-history"] = str()
270             else:
271                 self._msg_type = None
272             text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
273                           "VAT command history:", "", msg.message, count=1). \
274                 replace("\n\n", "\n").replace('\n', ' |br| ').\
275                 replace('\r', '').replace('"', "'")
276
277             self._data["tests"][self._test_ID]["vat-history"] += " |br| "
278             self._data["tests"][self._test_ID]["vat-history"] += \
279                 "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
280
281     def _get_show_run(self, msg):
282         """Called when extraction of VPP operational data (output of CLI command
283         Show Runtime) is required.
284
285         :param msg: Message to process.
286         :type msg: Message
287         :returns: Nothing.
288         """
289         if msg.message.count("return STDOUT Thread "):
290             self._show_run_lookup_nr += 1
291             if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
292                 self._data["tests"][self._test_ID]["show-run"] = str()
293             if self._lookup_kw_nr > 1:
294                 self._msg_type = None
295             if self._show_run_lookup_nr == 1:
296                 text = msg.message.replace("vat# ", "").\
297                     replace("return STDOUT ", "").replace("\n\n", "\n").\
298                     replace('\n', ' |br| ').\
299                     replace('\r', '').replace('"', "'")
300                 try:
301                     self._data["tests"][self._test_ID]["show-run"] += " |br| "
302                     self._data["tests"][self._test_ID]["show-run"] += \
303                         "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
304                 except KeyError:
305                     pass
306
307     def _get_latency(self, msg, test_type):
308         """Get the latency data from the test message.
309
310         :param msg: Message to be parsed.
311         :param test_type: Type of the test - NDR or PDR.
312         :type msg: str
313         :type test_type: str
314         :returns: Latencies parsed from the message.
315         :rtype: dict
316         """
317
318         if test_type == "NDR":
319             groups = re.search(self.REGEX_LAT_NDR, msg)
320             groups_range = range(1, 7)
321         elif test_type == "PDR":
322             groups = re.search(self.REGEX_LAT_PDR, msg)
323             groups_range = range(1, 3)
324         else:
325             return {}
326
327         latencies = list()
328         for idx in groups_range:
329             try:
330                 lat = [int(item) for item in str(groups.group(idx)).split('/')]
331             except (AttributeError, ValueError):
332                 lat = [-1, -1, -1]
333             latencies.append(lat)
334
335         keys = ("min", "avg", "max")
336         latency = {
337             "direction1": {
338             },
339             "direction2": {
340             }
341         }
342
343         latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
344         latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
345         if test_type == "NDR":
346             latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
347             latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
348             latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
349             latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
350
351         return latency
352
353     def visit_suite(self, suite):
354         """Implements traversing through the suite and its direct children.
355
356         :param suite: Suite to process.
357         :type suite: Suite
358         :returns: Nothing.
359         """
360         if self.start_suite(suite) is not False:
361             suite.suites.visit(self)
362             suite.tests.visit(self)
363             self.end_suite(suite)
364
365     def start_suite(self, suite):
366         """Called when suite starts.
367
368         :param suite: Suite to process.
369         :type suite: Suite
370         :returns: Nothing.
371         """
372
373         try:
374             parent_name = suite.parent.name
375         except AttributeError:
376             return
377
378         doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
379             replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
380         doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
381
382         self._data["suites"][suite.longname.lower().replace('"', "'").
383             replace(" ", "_")] = {
384                 "name": suite.name.lower(),
385                 "doc": doc_str,
386                 "parent": parent_name,
387                 "level": len(suite.longname.split("."))
388             }
389
390         suite.keywords.visit(self)
391
392     def end_suite(self, suite):
393         """Called when suite ends.
394
395         :param suite: Suite to process.
396         :type suite: Suite
397         :returns: Nothing.
398         """
399         pass
400
401     def visit_test(self, test):
402         """Implements traversing through the test.
403
404         :param test: Test to process.
405         :type test: Test
406         :returns: Nothing.
407         """
408         if self.start_test(test) is not False:
409             test.keywords.visit(self)
410             self.end_test(test)
411
412     def start_test(self, test):
413         """Called when test starts.
414
415         :param test: Test to process.
416         :type test: Test
417         :returns: Nothing.
418         """
419
420         tags = [str(tag) for tag in test.tags]
421         test_result = dict()
422         test_result["name"] = test.name.lower()
423         test_result["parent"] = test.parent.name.lower()
424         test_result["tags"] = tags
425         doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
426             replace('\r', '').replace('[', ' |br| [')
427         test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
428         test_result["msg"] = test.message.replace('\n', ' |br| '). \
429             replace('\r', '').replace('"', "'")
430         if test.status == "PASS" and ("NDRPDRDISC" in tags or
431                                       "TCP" in tags or
432                                       "MRR" in tags):
433             if "NDRDISC" in tags:
434                 test_type = "NDR"
435             elif "PDRDISC" in tags:
436                 test_type = "PDR"
437             elif "TCP" in tags:
438                 test_type = "TCP"
439             elif "MRR" in tags:
440                 test_type = "MRR"
441             else:
442                 return
443
444             test_result["type"] = test_type
445
446             if test_type in ("NDR", "PDR"):
447                 try:
448                     rate_value = str(re.search(
449                         self.REGEX_RATE, test.message).group(1))
450                 except AttributeError:
451                     rate_value = "-1"
452                 try:
453                     rate_unit = str(re.search(
454                         self.REGEX_RATE, test.message).group(2))
455                 except AttributeError:
456                     rate_unit = "-1"
457
458                 test_result["throughput"] = dict()
459                 test_result["throughput"]["value"] = \
460                     int(rate_value.split('.')[0])
461                 test_result["throughput"]["unit"] = rate_unit
462                 test_result["latency"] = \
463                     self._get_latency(test.message, test_type)
464                 if test_type == "PDR":
465                     test_result["lossTolerance"] = str(re.search(
466                         self.REGEX_TOLERANCE, test.message).group(1))
467
468             elif test_type in ("TCP", ):
469                 groups = re.search(self.REGEX_TCP, test.message)
470                 test_result["result"] = dict()
471                 test_result["result"]["value"] = int(groups.group(2))
472                 test_result["result"]["unit"] = groups.group(1)
473             elif test_type in ("MRR", ):
474                 groups = re.search(self.REGEX_MRR, test.message)
475                 test_result["result"] = dict()
476                 test_result["result"]["duration"] = int(groups.group(1))
477                 test_result["result"]["tx"] = int(groups.group(2))
478                 test_result["result"]["rx"] = int(groups.group(3))
479                 test_result["result"]["throughput"] = int(
480                     test_result["result"]["rx"] /
481                     test_result["result"]["duration"])
482         else:
483             test_result["status"] = test.status
484
485         self._test_ID = test.longname.lower()
486         self._data["tests"][self._test_ID] = test_result
487
488     def end_test(self, test):
489         """Called when test ends.
490
491         :param test: Test to process.
492         :type test: Test
493         :returns: Nothing.
494         """
495         pass
496
497     def visit_keyword(self, keyword):
498         """Implements traversing through the keyword and its child keywords.
499
500         :param keyword: Keyword to process.
501         :type keyword: Keyword
502         :returns: Nothing.
503         """
504         if self.start_keyword(keyword) is not False:
505             self.end_keyword(keyword)
506
507     def start_keyword(self, keyword):
508         """Called when keyword starts. Default implementation does nothing.
509
510         :param keyword: Keyword to process.
511         :type keyword: Keyword
512         :returns: Nothing.
513         """
514         try:
515             if keyword.type == "setup":
516                 self.visit_setup_kw(keyword)
517             elif keyword.type == "teardown":
518                 self._lookup_kw_nr = 0
519                 self.visit_teardown_kw(keyword)
520             else:
521                 self._lookup_kw_nr = 0
522                 self.visit_test_kw(keyword)
523         except AttributeError:
524             pass
525
526     def end_keyword(self, keyword):
527         """Called when keyword ends. Default implementation does nothing.
528
529         :param keyword: Keyword to process.
530         :type keyword: Keyword
531         :returns: Nothing.
532         """
533         pass
534
535     def visit_test_kw(self, test_kw):
536         """Implements traversing through the test keyword and its child
537         keywords.
538
539         :param test_kw: Keyword to process.
540         :type test_kw: Keyword
541         :returns: Nothing.
542         """
543         for keyword in test_kw.keywords:
544             if self.start_test_kw(keyword) is not False:
545                 self.visit_test_kw(keyword)
546                 self.end_test_kw(keyword)
547
548     def start_test_kw(self, test_kw):
549         """Called when test keyword starts. Default implementation does
550         nothing.
551
552         :param test_kw: Keyword to process.
553         :type test_kw: Keyword
554         :returns: Nothing.
555         """
556         if test_kw.name.count("Show Runtime Counters On All Duts"):
557             self._lookup_kw_nr += 1
558             self._show_run_lookup_nr = 0
559             self._msg_type = "test-show-runtime"
560             test_kw.messages.visit(self)
561
562     def end_test_kw(self, test_kw):
563         """Called when keyword ends. Default implementation does nothing.
564
565         :param test_kw: Keyword to process.
566         :type test_kw: Keyword
567         :returns: Nothing.
568         """
569         pass
570
571     def visit_setup_kw(self, setup_kw):
572         """Implements traversing through the teardown keyword and its child
573         keywords.
574
575         :param setup_kw: Keyword to process.
576         :type setup_kw: Keyword
577         :returns: Nothing.
578         """
579         for keyword in setup_kw.keywords:
580             if self.start_setup_kw(keyword) is not False:
581                 self.visit_setup_kw(keyword)
582                 self.end_setup_kw(keyword)
583
584     def start_setup_kw(self, setup_kw):
585         """Called when teardown keyword starts. Default implementation does
586         nothing.
587
588         :param setup_kw: Keyword to process.
589         :type setup_kw: Keyword
590         :returns: Nothing.
591         """
592         if setup_kw.name.count("Show Vpp Version On All Duts") \
593                 and not self._version:
594             self._msg_type = "setup-version"
595             setup_kw.messages.visit(self)
596
597     def end_setup_kw(self, setup_kw):
598         """Called when keyword ends. Default implementation does nothing.
599
600         :param setup_kw: Keyword to process.
601         :type setup_kw: Keyword
602         :returns: Nothing.
603         """
604         pass
605
606     def visit_teardown_kw(self, teardown_kw):
607         """Implements traversing through the teardown keyword and its child
608         keywords.
609
610         :param teardown_kw: Keyword to process.
611         :type teardown_kw: Keyword
612         :returns: Nothing.
613         """
614         for keyword in teardown_kw.keywords:
615             if self.start_teardown_kw(keyword) is not False:
616                 self.visit_teardown_kw(keyword)
617                 self.end_teardown_kw(keyword)
618
619     def start_teardown_kw(self, teardown_kw):
620         """Called when teardown keyword starts. Default implementation does
621         nothing.
622
623         :param teardown_kw: Keyword to process.
624         :type teardown_kw: Keyword
625         :returns: Nothing.
626         """
627
628         if teardown_kw.name.count("Show Vat History On All Duts"):
629             self._vat_history_lookup_nr = 0
630             self._msg_type = "teardown-vat-history"
631             teardown_kw.messages.visit(self)
632
633     def end_teardown_kw(self, teardown_kw):
634         """Called when keyword ends. Default implementation does nothing.
635
636         :param teardown_kw: Keyword to process.
637         :type teardown_kw: Keyword
638         :returns: Nothing.
639         """
640         pass
641
642     def visit_message(self, msg):
643         """Implements visiting the message.
644
645         :param msg: Message to process.
646         :type msg: Message
647         :returns: Nothing.
648         """
649         if self.start_message(msg) is not False:
650             self.end_message(msg)
651
652     def start_message(self, msg):
653         """Called when message starts. Get required information from messages:
654         - VPP version.
655
656         :param msg: Message to process.
657         :type msg: Message
658         :returns: Nothing.
659         """
660
661         if self._msg_type:
662             self.parse_msg[self._msg_type](msg)
663
664     def end_message(self, msg):
665         """Called when message ends. Default implementation does nothing.
666
667         :param msg: Message to process.
668         :type msg: Message
669         :returns: Nothing.
670         """
671         pass
672
673
674 class InputData(object):
675     """Input data
676
677     The data is extracted from output.xml files generated by Jenkins jobs and
678     stored in pandas' DataFrames.
679
680     The data structure:
681     - job name
682       - build number
683         - metadata
684           - job
685           - build
686           - vpp version
687         - suites
688         - tests
689           - ID: test data (as described in ExecutionChecker documentation)
690     """
691
692     def __init__(self, spec):
693         """Initialization.
694
695         :param spec: Specification.
696         :type spec: Specification
697         """
698
699         # Specification:
700         self._cfg = spec
701
702         # Data store:
703         self._input_data = pd.Series()
704
705     @property
706     def data(self):
707         """Getter - Input data.
708
709         :returns: Input data
710         :rtype: pandas.Series
711         """
712         return self._input_data
713
714     def metadata(self, job, build):
715         """Getter - metadata
716
717         :param job: Job which metadata we want.
718         :param build: Build which metadata we want.
719         :type job: str
720         :type build: str
721         :returns: Metadata
722         :rtype: pandas.Series
723         """
724
725         return self.data[job][build]["metadata"]
726
727     def suites(self, job, build):
728         """Getter - suites
729
730         :param job: Job which suites we want.
731         :param build: Build which suites we want.
732         :type job: str
733         :type build: str
734         :returns: Suites.
735         :rtype: pandas.Series
736         """
737
738         return self.data[job][str(build)]["suites"]
739
740     def tests(self, job, build):
741         """Getter - tests
742
743         :param job: Job which tests we want.
744         :param build: Build which tests we want.
745         :type job: str
746         :type build: str
747         :returns: Tests.
748         :rtype: pandas.Series
749         """
750
751         return self.data[job][build]["tests"]
752
753     @staticmethod
754     def _parse_tests(job, build, log):
755         """Process data from robot output.xml file and return JSON structured
756         data.
757
758         :param job: The name of job which build output data will be processed.
759         :param build: The build which output data will be processed.
760         :param log: List of log messages.
761         :type job: str
762         :type build: dict
763         :type log: list of tuples (severity, msg)
764         :returns: JSON data structure.
765         :rtype: dict
766         """
767
768         metadata = {
769             "job": job,
770             "build": build
771         }
772
773         with open(build["file-name"], 'r') as data_file:
774             try:
775                 result = ExecutionResult(data_file)
776             except errors.DataError as err:
777                 log.append(("ERROR", "Error occurred while parsing output.xml: "
778                                      "{0}".format(err)))
779                 return None
780         checker = ExecutionChecker(metadata)
781         result.visit(checker)
782
783         return checker.data
784
785     def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
786         """Download and parse the input data file.
787
788         :param pid: PID of the process executing this method.
789         :param data_queue: Shared memory between processes. Queue which keeps
790             the result data. This data is then read by the main process and used
791             in further processing.
792         :param job: Name of the Jenkins job which generated the processed input
793             file.
794         :param build: Information about the Jenkins build which generated the
795             processed input file.
796         :param repeat: Repeat the download specified number of times if not
797             successful.
798         :type pid: int
799         :type data_queue: multiprocessing.Manager().Queue()
800         :type job: str
801         :type build: dict
802         :type repeat: int
803         """
804
805         logs = list()
806
807         logging.info("  Processing the job/build: {0}: {1}".
808                      format(job, build["build"]))
809
810         logs.append(("INFO", "  Processing the job/build: {0}: {1}".
811                      format(job, build["build"])))
812
813         state = "failed"
814         success = False
815         data = None
816         do_repeat = repeat
817         while do_repeat:
818             success = download_and_unzip_data_file(self._cfg, job, build, pid,
819                                                    logs)
820             if success:
821                 break
822             do_repeat -= 1
823         if not success:
824             logs.append(("ERROR", "It is not possible to download the input "
825                                   "data file from the job '{job}', build "
826                                   "'{build}', or it is damaged. Skipped.".
827                          format(job=job, build=build["build"])))
828         if success:
829             logs.append(("INFO", "  Processing data from the build '{0}' ...".
830                          format(build["build"])))
831             data = InputData._parse_tests(job, build, logs)
832             if data is None:
833                 logs.append(("ERROR", "Input data file from the job '{job}', "
834                                       "build '{build}' is damaged. Skipped.".
835                              format(job=job, build=build["build"])))
836             else:
837                 state = "processed"
838
839             try:
840                 remove(build["file-name"])
841             except OSError as err:
842                 logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
843                              format(build["file-name"], err)))
844         logs.append(("INFO", "  Done."))
845
846         result = {
847             "data": data,
848             "state": state,
849             "job": job,
850             "build": build,
851             "logs": logs
852         }
853         data_queue.put(result)
854
855     def download_and_parse_data(self, repeat=1):
856         """Download the input data files, parse input data from input files and
857         store in pandas' Series.
858
859         :param repeat: Repeat the download specified number of times if not
860             successful.
861         :type repeat: int
862         """
863
864         logging.info("Downloading and parsing input files ...")
865
866         work_queue = multiprocessing.JoinableQueue()
867         manager = multiprocessing.Manager()
868         data_queue = manager.Queue()
869         cpus = multiprocessing.cpu_count()
870
871         workers = list()
872         for cpu in range(cpus):
873             worker = Worker(work_queue,
874                             data_queue,
875                             self._download_and_parse_build)
876             worker.daemon = True
877             worker.start()
878             workers.append(worker)
879             os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
880                       format(cpu, worker.pid))
881
882         for job, builds in self._cfg.builds.items():
883             for build in builds:
884                 work_queue.put((job, build, repeat))
885
886         work_queue.join()
887
888         logging.info("Done.")
889
890         while not data_queue.empty():
891             result = data_queue.get()
892
893             job = result["job"]
894             build_nr = result["build"]["build"]
895
896             if result["data"]:
897                 data = result["data"]
898                 build_data = pd.Series({
899                     "metadata": pd.Series(data["metadata"].values(),
900                                           index=data["metadata"].keys()),
901                     "suites": pd.Series(data["suites"].values(),
902                                         index=data["suites"].keys()),
903                     "tests": pd.Series(data["tests"].values(),
904                                        index=data["tests"].keys())})
905
906                 if self._input_data.get(job, None) is None:
907                     self._input_data[job] = pd.Series()
908                 self._input_data[job][str(build_nr)] = build_data
909
910                 self._cfg.set_input_file_name(job, build_nr,
911                                               result["build"]["file-name"])
912
913             self._cfg.set_input_state(job, build_nr, result["state"])
914
915             for item in result["logs"]:
916                 if item[0] == "INFO":
917                     logging.info(item[1])
918                 elif item[0] == "ERROR":
919                     logging.error(item[1])
920                 elif item[0] == "DEBUG":
921                     logging.debug(item[1])
922                 elif item[0] == "CRITICAL":
923                     logging.critical(item[1])
924                 elif item[0] == "WARNING":
925                     logging.warning(item[1])
926
927         del data_queue
928
929         # Terminate all workers
930         for worker in workers:
931             worker.terminate()
932             worker.join()
933
934         logging.info("Done.")
935
936     @staticmethod
937     def _end_of_tag(tag_filter, start=0, closer="'"):
938         """Return the index of character in the string which is the end of tag.
939
940         :param tag_filter: The string where the end of tag is being searched.
941         :param start: The index where the searching is stated.
942         :param closer: The character which is the tag closer.
943         :type tag_filter: str
944         :type start: int
945         :type closer: str
946         :returns: The index of the tag closer.
947         :rtype: int
948         """
949
950         try:
951             idx_opener = tag_filter.index(closer, start)
952             return tag_filter.index(closer, idx_opener + 1)
953         except ValueError:
954             return None
955
956     @staticmethod
957     def _condition(tag_filter):
958         """Create a conditional statement from the given tag filter.
959
960         :param tag_filter: Filter based on tags from the element specification.
961         :type tag_filter: str
962         :returns: Conditional statement which can be evaluated.
963         :rtype: str
964         """
965
966         index = 0
967         while True:
968             index = InputData._end_of_tag(tag_filter, index)
969             if index is None:
970                 return tag_filter
971             index += 1
972             tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
973
974     def filter_data(self, element, params=None, data_set="tests",
975                     continue_on_error=False):
976         """Filter required data from the given jobs and builds.
977
978         The output data structure is:
979
980         - job 1
981           - build 1
982             - test (suite) 1 ID:
983               - param 1
984               - param 2
985               ...
986               - param n
987             ...
988             - test (suite) n ID:
989             ...
990           ...
991           - build n
992         ...
993         - job n
994
995         :param element: Element which will use the filtered data.
996         :param params: Parameters which will be included in the output. If None,
997         all parameters are included.
998         :param data_set: The set of data to be filtered: tests, suites,
999         metadata.
1000         :param continue_on_error: Continue if there is error while reading the
1001         data. The Item will be empty then
1002         :type element: pandas.Series
1003         :type params: list
1004         :type data_set: str
1005         :type continue_on_error: bool
1006         :returns: Filtered data.
1007         :rtype pandas.Series
1008         """
1009
1010         try:
1011             if element["filter"] in ("all", "template"):
1012                 cond = "True"
1013             else:
1014                 cond = InputData._condition(element["filter"])
1015             logging.debug("   Filter: {0}".format(cond))
1016         except KeyError:
1017             logging.error("  No filter defined.")
1018             return None
1019
1020         if params is None:
1021             params = element.get("parameters", None)
1022
1023         data = pd.Series()
1024         try:
1025             for job, builds in element["data"].items():
1026                 data[job] = pd.Series()
1027                 for build in builds:
1028                     data[job][str(build)] = pd.Series()
1029                     try:
1030                         data_iter = self.data[job][str(build)][data_set].\
1031                             iteritems()
1032                     except KeyError:
1033                         if continue_on_error:
1034                             continue
1035                         else:
1036                             return None
1037                     for test_ID, test_data in data_iter:
1038                         if eval(cond, {"tags": test_data.get("tags", "")}):
1039                             data[job][str(build)][test_ID] = pd.Series()
1040                             if params is None:
1041                                 for param, val in test_data.items():
1042                                     data[job][str(build)][test_ID][param] = val
1043                             else:
1044                                 for param in params:
1045                                     try:
1046                                         data[job][str(build)][test_ID][param] =\
1047                                             test_data[param]
1048                                     except KeyError:
1049                                         data[job][str(build)][test_ID][param] =\
1050                                             "No Data"
1051             return data
1052
1053         except (KeyError, IndexError, ValueError) as err:
1054             logging.error("   Missing mandatory parameter in the element "
1055                           "specification: {0}".format(err))
1056             return None
1057         except AttributeError:
1058             return None
1059         except SyntaxError:
1060             logging.error("   The filter '{0}' is not correct. Check if all "
1061                           "tags are enclosed by apostrophes.".format(cond))
1062             return None
1063
1064     @staticmethod
1065     def merge_data(data):
1066         """Merge data from more jobs and builds to a simple data structure.
1067
1068         The output data structure is:
1069
1070         - test (suite) 1 ID:
1071           - param 1
1072           - param 2
1073           ...
1074           - param n
1075         ...
1076         - test (suite) n ID:
1077         ...
1078
1079         :param data: Data to merge.
1080         :type data: pandas.Series
1081         :returns: Merged data.
1082         :rtype: pandas.Series
1083         """
1084
1085         logging.info("    Merging data ...")
1086
1087         merged_data = pd.Series()
1088         for _, builds in data.iteritems():
1089             for _, item in builds.iteritems():
1090                 for ID, item_data in item.iteritems():
1091                     merged_data[ID] = item_data
1092
1093         return merged_data