cf132377748870a9609efe89ba4c947729351a53
[csit.git] / resources / tools / presentation / input_data_parser.py
1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Data pre-processing
15
16 - extract data from output.xml files generated by Jenkins jobs and store in
17   pandas' Series,
18 - provide access to the data.
19 """
20
21 import multiprocessing
22 import os
23 import re
24 import pandas as pd
25 import logging
26
27 from robot.api import ExecutionResult, ResultVisitor
28 from robot import errors
29 from collections import OrderedDict
30 from string import replace
31 from os import remove
32
33 from input_data_files import download_and_unzip_data_file
34 from utils import Worker
35
36
37 class ExecutionChecker(ResultVisitor):
38     """Class to traverse through the test suite structure.
39
40     The functionality implemented in this class generates a json structure:
41
42     Performance tests:
43
44     {
45         "metadata": {  # Optional
46             "version": "VPP version",
47             "job": "Jenkins job name",
48             "build": "Information about the build"
49         },
50         "suites": {
51             "Suite name 1": {
52                 "doc": "Suite 1 documentation",
53                 "parent": "Suite 1 parent",
54                 "level": "Level of the suite in the suite hierarchy"
55             }
56             "Suite name N": {
57                 "doc": "Suite N documentation",
58                 "parent": "Suite 2 parent",
59                 "level": "Level of the suite in the suite hierarchy"
60             }
61         }
62         "tests": {
63             "ID": {
64                 "name": "Test name",
65                 "parent": "Name of the parent of the test",
66                 "doc": "Test documentation"
67                 "msg": "Test message"
68                 "tags": ["tag 1", "tag 2", "tag n"],
69                 "type": "PDR" | "NDR",
70                 "throughput": {
71                     "value": int,
72                     "unit": "pps" | "bps" | "percentage"
73                 },
74                 "latency": {
75                     "direction1": {
76                         "100": {
77                             "min": int,
78                             "avg": int,
79                             "max": int
80                         },
81                         "50": {  # Only for NDR
82                             "min": int,
83                             "avg": int,
84                             "max": int
85                         },
86                         "10": {  # Only for NDR
87                             "min": int,
88                             "avg": int,
89                             "max": int
90                         }
91                     },
92                     "direction2": {
93                         "100": {
94                             "min": int,
95                             "avg": int,
96                             "max": int
97                         },
98                         "50": {  # Only for NDR
99                             "min": int,
100                             "avg": int,
101                             "max": int
102                         },
103                         "10": {  # Only for NDR
104                             "min": int,
105                             "avg": int,
106                             "max": int
107                         }
108                     }
109                 },
110                 "lossTolerance": "lossTolerance",  # Only for PDR
111                 "vat-history": "DUT1 and DUT2 VAT History"
112                 },
113                 "show-run": "Show Run"
114             },
115             "ID" {
116                 # next test
117             }
118         }
119     }
120
121     Functional tests:
122
123
124     {
125         "metadata": {  # Optional
126             "version": "VPP version",
127             "job": "Jenkins job name",
128             "build": "Information about the build"
129         },
130         "suites": {
131             "Suite name 1": {
132                 "doc": "Suite 1 documentation",
133                 "parent": "Suite 1 parent",
134                 "level": "Level of the suite in the suite hierarchy"
135             }
136             "Suite name N": {
137                 "doc": "Suite N documentation",
138                 "parent": "Suite 2 parent",
139                 "level": "Level of the suite in the suite hierarchy"
140             }
141         }
142         "tests": {
143             "ID": {
144                 "name": "Test name",
145                 "parent": "Name of the parent of the test",
146                 "doc": "Test documentation"
147                 "msg": "Test message"
148                 "tags": ["tag 1", "tag 2", "tag n"],
149                 "vat-history": "DUT1 and DUT2 VAT History"
150                 "show-run": "Show Run"
151                 "status": "PASS" | "FAIL"
152             },
153             "ID" {
154                 # next test
155             }
156         }
157     }
158
159     .. note:: ID is the lowercase full path to the test.
160     """
161
162     REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
163
164     REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
165                                r'LAT_\d+%NDR:\s\[\'(-?\d+\/-?\d+/-?\d+)\','
166                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
167                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
168                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
169                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
170                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
171
172     REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
173                                r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
174                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
175
176     REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
177                                  r'[\D\d]*')
178
179     REGEX_VERSION_VPP = re.compile(r"(return STDOUT Version:\s*)(.*)")
180
181     REGEX_VERSION_DPDK = re.compile(r"(return STDOUT testpmd)([\d\D\n]*)"
182                                     r"(RTE Version: 'DPDK )(.*)(')")
183
184     REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
185
186     REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
187                            r'tx\s(\d*),\srx\s(\d*)')
188
189     def __init__(self, metadata):
190         """Initialisation.
191
192         :param metadata: Key-value pairs to be included in "metadata" part of
193         JSON structure.
194         :type metadata: dict
195         """
196
197         # Type of message to parse out from the test messages
198         self._msg_type = None
199
200         # VPP version
201         self._version = None
202
203         # Timestamp
204         self._timestamp = None
205
206         # Number of VAT History messages found:
207         # 0 - no message
208         # 1 - VAT History of DUT1
209         # 2 - VAT History of DUT2
210         self._lookup_kw_nr = 0
211         self._vat_history_lookup_nr = 0
212
213         # Number of Show Running messages found
214         # 0 - no message
215         # 1 - Show run message found
216         self._show_run_lookup_nr = 0
217
218         # Test ID of currently processed test- the lowercase full path to the
219         # test
220         self._test_ID = None
221
222         # The main data structure
223         self._data = {
224             "metadata": OrderedDict(),
225             "suites": OrderedDict(),
226             "tests": OrderedDict()
227         }
228
229         # Save the provided metadata
230         for key, val in metadata.items():
231             self._data["metadata"][key] = val
232
233         # Dictionary defining the methods used to parse different types of
234         # messages
235         self.parse_msg = {
236             "timestamp": self._get_timestamp,
237             "vpp-version": self._get_vpp_version,
238             "dpdk-version": self._get_dpdk_version,
239             "teardown-vat-history": self._get_vat_history,
240             "test-show-runtime": self._get_show_run
241         }
242
243     @property
244     def data(self):
245         """Getter - Data parsed from the XML file.
246
247         :returns: Data parsed from the XML file.
248         :rtype: dict
249         """
250         return self._data
251
252     def _get_vpp_version(self, msg):
253         """Called when extraction of VPP version is required.
254
255         :param msg: Message to process.
256         :type msg: Message
257         :returns: Nothing.
258         """
259
260         if msg.message.count("return STDOUT Version:"):
261             self._version = str(re.search(self.REGEX_VERSION_VPP, msg.message).
262                                 group(2))
263             self._data["metadata"]["version"] = self._version
264             self._msg_type = None
265
266     def _get_dpdk_version(self, msg):
267         """Called when extraction of DPDK version is required.
268
269         :param msg: Message to process.
270         :type msg: Message
271         :returns: Nothing.
272         """
273
274         if msg.message.count("return STDOUT testpmd"):
275             try:
276                 self._version = str(re.search(
277                     self.REGEX_VERSION_DPDK, msg.message). group(4))
278                 self._data["metadata"]["version"] = self._version
279             except IndexError:
280                 pass
281             finally:
282                 self._msg_type = None
283
284     def _get_timestamp(self, msg):
285         """Called when extraction of timestamp is required.
286
287         :param msg: Message to process.
288         :type msg: Message
289         :returns: Nothing.
290         """
291
292         self._timestamp = msg.timestamp[:14]
293         self._data["metadata"]["generated"] = self._timestamp
294         self._msg_type = None
295
296     def _get_vat_history(self, msg):
297         """Called when extraction of VAT command history is required.
298
299         :param msg: Message to process.
300         :type msg: Message
301         :returns: Nothing.
302         """
303         if msg.message.count("VAT command history:"):
304             self._vat_history_lookup_nr += 1
305             if self._vat_history_lookup_nr == 1:
306                 self._data["tests"][self._test_ID]["vat-history"] = str()
307             else:
308                 self._msg_type = None
309             text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
310                           "VAT command history:", "", msg.message, count=1). \
311                 replace("\n\n", "\n").replace('\n', ' |br| ').\
312                 replace('\r', '').replace('"', "'")
313
314             self._data["tests"][self._test_ID]["vat-history"] += " |br| "
315             self._data["tests"][self._test_ID]["vat-history"] += \
316                 "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
317
318     def _get_show_run(self, msg):
319         """Called when extraction of VPP operational data (output of CLI command
320         Show Runtime) is required.
321
322         :param msg: Message to process.
323         :type msg: Message
324         :returns: Nothing.
325         """
326         if msg.message.count("return STDOUT Thread "):
327             self._show_run_lookup_nr += 1
328             if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
329                 self._data["tests"][self._test_ID]["show-run"] = str()
330             if self._lookup_kw_nr > 1:
331                 self._msg_type = None
332             if self._show_run_lookup_nr == 1:
333                 text = msg.message.replace("vat# ", "").\
334                     replace("return STDOUT ", "").replace("\n\n", "\n").\
335                     replace('\n', ' |br| ').\
336                     replace('\r', '').replace('"', "'")
337                 try:
338                     self._data["tests"][self._test_ID]["show-run"] += " |br| "
339                     self._data["tests"][self._test_ID]["show-run"] += \
340                         "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
341                 except KeyError:
342                     pass
343
344     def _get_latency(self, msg, test_type):
345         """Get the latency data from the test message.
346
347         :param msg: Message to be parsed.
348         :param test_type: Type of the test - NDR or PDR.
349         :type msg: str
350         :type test_type: str
351         :returns: Latencies parsed from the message.
352         :rtype: dict
353         """
354
355         if test_type == "NDR":
356             groups = re.search(self.REGEX_LAT_NDR, msg)
357             groups_range = range(1, 7)
358         elif test_type == "PDR":
359             groups = re.search(self.REGEX_LAT_PDR, msg)
360             groups_range = range(1, 3)
361         else:
362             return {}
363
364         latencies = list()
365         for idx in groups_range:
366             try:
367                 lat = [int(item) for item in str(groups.group(idx)).split('/')]
368             except (AttributeError, ValueError):
369                 lat = [-1, -1, -1]
370             latencies.append(lat)
371
372         keys = ("min", "avg", "max")
373         latency = {
374             "direction1": {
375             },
376             "direction2": {
377             }
378         }
379
380         latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
381         latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
382         if test_type == "NDR":
383             latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
384             latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
385             latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
386             latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
387
388         return latency
389
390     def visit_suite(self, suite):
391         """Implements traversing through the suite and its direct children.
392
393         :param suite: Suite to process.
394         :type suite: Suite
395         :returns: Nothing.
396         """
397         if self.start_suite(suite) is not False:
398             suite.suites.visit(self)
399             suite.tests.visit(self)
400             self.end_suite(suite)
401
402     def start_suite(self, suite):
403         """Called when suite starts.
404
405         :param suite: Suite to process.
406         :type suite: Suite
407         :returns: Nothing.
408         """
409
410         try:
411             parent_name = suite.parent.name
412         except AttributeError:
413             return
414
415         doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
416             replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
417         doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
418
419         self._data["suites"][suite.longname.lower().replace('"', "'").
420             replace(" ", "_")] = {
421                 "name": suite.name.lower(),
422                 "doc": doc_str,
423                 "parent": parent_name,
424                 "level": len(suite.longname.split("."))
425             }
426
427         suite.keywords.visit(self)
428
429     def end_suite(self, suite):
430         """Called when suite ends.
431
432         :param suite: Suite to process.
433         :type suite: Suite
434         :returns: Nothing.
435         """
436         pass
437
438     def visit_test(self, test):
439         """Implements traversing through the test.
440
441         :param test: Test to process.
442         :type test: Test
443         :returns: Nothing.
444         """
445         if self.start_test(test) is not False:
446             test.keywords.visit(self)
447             self.end_test(test)
448
449     def start_test(self, test):
450         """Called when test starts.
451
452         :param test: Test to process.
453         :type test: Test
454         :returns: Nothing.
455         """
456
457         tags = [str(tag) for tag in test.tags]
458         test_result = dict()
459         test_result["name"] = test.name.lower()
460         test_result["parent"] = test.parent.name.lower()
461         test_result["tags"] = tags
462         doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
463             replace('\r', '').replace('[', ' |br| [')
464         test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
465         test_result["msg"] = test.message.replace('\n', ' |br| '). \
466             replace('\r', '').replace('"', "'")
467         test_result["status"] = test.status
468         if test.status == "PASS" and ("NDRPDRDISC" in tags or
469                                       "TCP" in tags or
470                                       "MRR" in tags):
471             if "NDRDISC" in tags:
472                 test_type = "NDR"
473             elif "PDRDISC" in tags:
474                 test_type = "PDR"
475             elif "TCP" in tags:
476                 test_type = "TCP"
477             elif "MRR" in tags:
478                 test_type = "MRR"
479             else:
480                 return
481
482             test_result["type"] = test_type
483
484             if test_type in ("NDR", "PDR"):
485                 try:
486                     rate_value = str(re.search(
487                         self.REGEX_RATE, test.message).group(1))
488                 except AttributeError:
489                     rate_value = "-1"
490                 try:
491                     rate_unit = str(re.search(
492                         self.REGEX_RATE, test.message).group(2))
493                 except AttributeError:
494                     rate_unit = "-1"
495
496                 test_result["throughput"] = dict()
497                 test_result["throughput"]["value"] = \
498                     int(rate_value.split('.')[0])
499                 test_result["throughput"]["unit"] = rate_unit
500                 test_result["latency"] = \
501                     self._get_latency(test.message, test_type)
502                 if test_type == "PDR":
503                     test_result["lossTolerance"] = str(re.search(
504                         self.REGEX_TOLERANCE, test.message).group(1))
505
506             elif test_type in ("TCP", ):
507                 groups = re.search(self.REGEX_TCP, test.message)
508                 test_result["result"] = dict()
509                 test_result["result"]["value"] = int(groups.group(2))
510                 test_result["result"]["unit"] = groups.group(1)
511
512             elif test_type in ("MRR", ):
513                 groups = re.search(self.REGEX_MRR, test.message)
514                 test_result["result"] = dict()
515                 test_result["result"]["duration"] = int(groups.group(1))
516                 test_result["result"]["tx"] = int(groups.group(2))
517                 test_result["result"]["rx"] = int(groups.group(3))
518                 test_result["result"]["throughput"] = int(
519                     test_result["result"]["rx"] /
520                     test_result["result"]["duration"])
521
522         self._test_ID = test.longname.lower()
523         self._data["tests"][self._test_ID] = test_result
524
525     def end_test(self, test):
526         """Called when test ends.
527
528         :param test: Test to process.
529         :type test: Test
530         :returns: Nothing.
531         """
532         pass
533
534     def visit_keyword(self, keyword):
535         """Implements traversing through the keyword and its child keywords.
536
537         :param keyword: Keyword to process.
538         :type keyword: Keyword
539         :returns: Nothing.
540         """
541         if self.start_keyword(keyword) is not False:
542             self.end_keyword(keyword)
543
544     def start_keyword(self, keyword):
545         """Called when keyword starts. Default implementation does nothing.
546
547         :param keyword: Keyword to process.
548         :type keyword: Keyword
549         :returns: Nothing.
550         """
551         try:
552             if keyword.type == "setup":
553                 self.visit_setup_kw(keyword)
554             elif keyword.type == "teardown":
555                 self._lookup_kw_nr = 0
556                 self.visit_teardown_kw(keyword)
557             else:
558                 self._lookup_kw_nr = 0
559                 self.visit_test_kw(keyword)
560         except AttributeError:
561             pass
562
563     def end_keyword(self, keyword):
564         """Called when keyword ends. Default implementation does nothing.
565
566         :param keyword: Keyword to process.
567         :type keyword: Keyword
568         :returns: Nothing.
569         """
570         pass
571
572     def visit_test_kw(self, test_kw):
573         """Implements traversing through the test keyword and its child
574         keywords.
575
576         :param test_kw: Keyword to process.
577         :type test_kw: Keyword
578         :returns: Nothing.
579         """
580         for keyword in test_kw.keywords:
581             if self.start_test_kw(keyword) is not False:
582                 self.visit_test_kw(keyword)
583                 self.end_test_kw(keyword)
584
585     def start_test_kw(self, test_kw):
586         """Called when test keyword starts. Default implementation does
587         nothing.
588
589         :param test_kw: Keyword to process.
590         :type test_kw: Keyword
591         :returns: Nothing.
592         """
593         if test_kw.name.count("Show Runtime Counters On All Duts"):
594             self._lookup_kw_nr += 1
595             self._show_run_lookup_nr = 0
596             self._msg_type = "test-show-runtime"
597         elif test_kw.name.count("Start The L2fwd Test") and not self._version:
598             self._msg_type = "dpdk-version"
599         else:
600             return
601         test_kw.messages.visit(self)
602
603     def end_test_kw(self, test_kw):
604         """Called when keyword ends. Default implementation does nothing.
605
606         :param test_kw: Keyword to process.
607         :type test_kw: Keyword
608         :returns: Nothing.
609         """
610         pass
611
612     def visit_setup_kw(self, setup_kw):
613         """Implements traversing through the teardown keyword and its child
614         keywords.
615
616         :param setup_kw: Keyword to process.
617         :type setup_kw: Keyword
618         :returns: Nothing.
619         """
620         for keyword in setup_kw.keywords:
621             if self.start_setup_kw(keyword) is not False:
622                 self.visit_setup_kw(keyword)
623                 self.end_setup_kw(keyword)
624
625     def start_setup_kw(self, setup_kw):
626         """Called when teardown keyword starts. Default implementation does
627         nothing.
628
629         :param setup_kw: Keyword to process.
630         :type setup_kw: Keyword
631         :returns: Nothing.
632         """
633         if setup_kw.name.count("Show Vpp Version On All Duts") \
634                 and not self._version:
635             self._msg_type = "vpp-version"
636
637         elif setup_kw.name.count("Setup performance global Variables") \
638                 and not self._timestamp:
639             self._msg_type = "timestamp"
640         else:
641             return
642         setup_kw.messages.visit(self)
643
644     def end_setup_kw(self, setup_kw):
645         """Called when keyword ends. Default implementation does nothing.
646
647         :param setup_kw: Keyword to process.
648         :type setup_kw: Keyword
649         :returns: Nothing.
650         """
651         pass
652
653     def visit_teardown_kw(self, teardown_kw):
654         """Implements traversing through the teardown keyword and its child
655         keywords.
656
657         :param teardown_kw: Keyword to process.
658         :type teardown_kw: Keyword
659         :returns: Nothing.
660         """
661         for keyword in teardown_kw.keywords:
662             if self.start_teardown_kw(keyword) is not False:
663                 self.visit_teardown_kw(keyword)
664                 self.end_teardown_kw(keyword)
665
666     def start_teardown_kw(self, teardown_kw):
667         """Called when teardown keyword starts. Default implementation does
668         nothing.
669
670         :param teardown_kw: Keyword to process.
671         :type teardown_kw: Keyword
672         :returns: Nothing.
673         """
674
675         if teardown_kw.name.count("Show Vat History On All Duts"):
676             self._vat_history_lookup_nr = 0
677             self._msg_type = "teardown-vat-history"
678             teardown_kw.messages.visit(self)
679
680     def end_teardown_kw(self, teardown_kw):
681         """Called when keyword ends. Default implementation does nothing.
682
683         :param teardown_kw: Keyword to process.
684         :type teardown_kw: Keyword
685         :returns: Nothing.
686         """
687         pass
688
689     def visit_message(self, msg):
690         """Implements visiting the message.
691
692         :param msg: Message to process.
693         :type msg: Message
694         :returns: Nothing.
695         """
696         if self.start_message(msg) is not False:
697             self.end_message(msg)
698
699     def start_message(self, msg):
700         """Called when message starts. Get required information from messages:
701         - VPP version.
702
703         :param msg: Message to process.
704         :type msg: Message
705         :returns: Nothing.
706         """
707
708         if self._msg_type:
709             self.parse_msg[self._msg_type](msg)
710
711     def end_message(self, msg):
712         """Called when message ends. Default implementation does nothing.
713
714         :param msg: Message to process.
715         :type msg: Message
716         :returns: Nothing.
717         """
718         pass
719
720
721 class InputData(object):
722     """Input data
723
724     The data is extracted from output.xml files generated by Jenkins jobs and
725     stored in pandas' DataFrames.
726
727     The data structure:
728     - job name
729       - build number
730         - metadata
731           - job
732           - build
733           - vpp version
734         - suites
735         - tests
736           - ID: test data (as described in ExecutionChecker documentation)
737     """
738
739     def __init__(self, spec):
740         """Initialization.
741
742         :param spec: Specification.
743         :type spec: Specification
744         """
745
746         # Specification:
747         self._cfg = spec
748
749         # Data store:
750         self._input_data = pd.Series()
751
752     @property
753     def data(self):
754         """Getter - Input data.
755
756         :returns: Input data
757         :rtype: pandas.Series
758         """
759         return self._input_data
760
761     def metadata(self, job, build):
762         """Getter - metadata
763
764         :param job: Job which metadata we want.
765         :param build: Build which metadata we want.
766         :type job: str
767         :type build: str
768         :returns: Metadata
769         :rtype: pandas.Series
770         """
771
772         return self.data[job][build]["metadata"]
773
774     def suites(self, job, build):
775         """Getter - suites
776
777         :param job: Job which suites we want.
778         :param build: Build which suites we want.
779         :type job: str
780         :type build: str
781         :returns: Suites.
782         :rtype: pandas.Series
783         """
784
785         return self.data[job][str(build)]["suites"]
786
787     def tests(self, job, build):
788         """Getter - tests
789
790         :param job: Job which tests we want.
791         :param build: Build which tests we want.
792         :type job: str
793         :type build: str
794         :returns: Tests.
795         :rtype: pandas.Series
796         """
797
798         return self.data[job][build]["tests"]
799
800     @staticmethod
801     def _parse_tests(job, build, log):
802         """Process data from robot output.xml file and return JSON structured
803         data.
804
805         :param job: The name of job which build output data will be processed.
806         :param build: The build which output data will be processed.
807         :param log: List of log messages.
808         :type job: str
809         :type build: dict
810         :type log: list of tuples (severity, msg)
811         :returns: JSON data structure.
812         :rtype: dict
813         """
814
815         metadata = {
816             "job": job,
817             "build": build
818         }
819
820         with open(build["file-name"], 'r') as data_file:
821             try:
822                 result = ExecutionResult(data_file)
823             except errors.DataError as err:
824                 log.append(("ERROR", "Error occurred while parsing output.xml: "
825                                      "{0}".format(err)))
826                 return None
827         checker = ExecutionChecker(metadata)
828         result.visit(checker)
829
830         return checker.data
831
832     def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
833         """Download and parse the input data file.
834
835         :param pid: PID of the process executing this method.
836         :param data_queue: Shared memory between processes. Queue which keeps
837             the result data. This data is then read by the main process and used
838             in further processing.
839         :param job: Name of the Jenkins job which generated the processed input
840             file.
841         :param build: Information about the Jenkins build which generated the
842             processed input file.
843         :param repeat: Repeat the download specified number of times if not
844             successful.
845         :type pid: int
846         :type data_queue: multiprocessing.Manager().Queue()
847         :type job: str
848         :type build: dict
849         :type repeat: int
850         """
851
852         logs = list()
853
854         logging.info("  Processing the job/build: {0}: {1}".
855                      format(job, build["build"]))
856
857         logs.append(("INFO", "  Processing the job/build: {0}: {1}".
858                      format(job, build["build"])))
859
860         state = "failed"
861         success = False
862         data = None
863         do_repeat = repeat
864         while do_repeat:
865             success = download_and_unzip_data_file(self._cfg, job, build, pid,
866                                                    logs)
867             if success:
868                 break
869             do_repeat -= 1
870         if not success:
871             logs.append(("ERROR", "It is not possible to download the input "
872                                   "data file from the job '{job}', build "
873                                   "'{build}', or it is damaged. Skipped.".
874                          format(job=job, build=build["build"])))
875         if success:
876             logs.append(("INFO", "  Processing data from the build '{0}' ...".
877                          format(build["build"])))
878             data = InputData._parse_tests(job, build, logs)
879             if data is None:
880                 logs.append(("ERROR", "Input data file from the job '{job}', "
881                                       "build '{build}' is damaged. Skipped.".
882                              format(job=job, build=build["build"])))
883             else:
884                 state = "processed"
885
886             try:
887                 remove(build["file-name"])
888             except OSError as err:
889                 logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
890                              format(build["file-name"], err)))
891         logs.append(("INFO", "  Done."))
892
893         result = {
894             "data": data,
895             "state": state,
896             "job": job,
897             "build": build,
898             "logs": logs
899         }
900         data_queue.put(result)
901
902     def download_and_parse_data(self, repeat=1):
903         """Download the input data files, parse input data from input files and
904         store in pandas' Series.
905
906         :param repeat: Repeat the download specified number of times if not
907             successful.
908         :type repeat: int
909         """
910
911         logging.info("Downloading and parsing input files ...")
912
913         work_queue = multiprocessing.JoinableQueue()
914         manager = multiprocessing.Manager()
915         data_queue = manager.Queue()
916         cpus = multiprocessing.cpu_count()
917
918         workers = list()
919         for cpu in range(cpus):
920             worker = Worker(work_queue,
921                             data_queue,
922                             self._download_and_parse_build)
923             worker.daemon = True
924             worker.start()
925             workers.append(worker)
926             os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
927                       format(cpu, worker.pid))
928
929         for job, builds in self._cfg.builds.items():
930             for build in builds:
931                 work_queue.put((job, build, repeat))
932
933         work_queue.join()
934
935         logging.info("Done.")
936
937         while not data_queue.empty():
938             result = data_queue.get()
939
940             job = result["job"]
941             build_nr = result["build"]["build"]
942
943             if result["data"]:
944                 data = result["data"]
945                 build_data = pd.Series({
946                     "metadata": pd.Series(data["metadata"].values(),
947                                           index=data["metadata"].keys()),
948                     "suites": pd.Series(data["suites"].values(),
949                                         index=data["suites"].keys()),
950                     "tests": pd.Series(data["tests"].values(),
951                                        index=data["tests"].keys())})
952
953                 if self._input_data.get(job, None) is None:
954                     self._input_data[job] = pd.Series()
955                 self._input_data[job][str(build_nr)] = build_data
956
957                 self._cfg.set_input_file_name(job, build_nr,
958                                               result["build"]["file-name"])
959
960             self._cfg.set_input_state(job, build_nr, result["state"])
961
962             for item in result["logs"]:
963                 if item[0] == "INFO":
964                     logging.info(item[1])
965                 elif item[0] == "ERROR":
966                     logging.error(item[1])
967                 elif item[0] == "DEBUG":
968                     logging.debug(item[1])
969                 elif item[0] == "CRITICAL":
970                     logging.critical(item[1])
971                 elif item[0] == "WARNING":
972                     logging.warning(item[1])
973
974         del data_queue
975
976         # Terminate all workers
977         for worker in workers:
978             worker.terminate()
979             worker.join()
980
981         logging.info("Done.")
982
983     @staticmethod
984     def _end_of_tag(tag_filter, start=0, closer="'"):
985         """Return the index of character in the string which is the end of tag.
986
987         :param tag_filter: The string where the end of tag is being searched.
988         :param start: The index where the searching is stated.
989         :param closer: The character which is the tag closer.
990         :type tag_filter: str
991         :type start: int
992         :type closer: str
993         :returns: The index of the tag closer.
994         :rtype: int
995         """
996
997         try:
998             idx_opener = tag_filter.index(closer, start)
999             return tag_filter.index(closer, idx_opener + 1)
1000         except ValueError:
1001             return None
1002
1003     @staticmethod
1004     def _condition(tag_filter):
1005         """Create a conditional statement from the given tag filter.
1006
1007         :param tag_filter: Filter based on tags from the element specification.
1008         :type tag_filter: str
1009         :returns: Conditional statement which can be evaluated.
1010         :rtype: str
1011         """
1012
1013         index = 0
1014         while True:
1015             index = InputData._end_of_tag(tag_filter, index)
1016             if index is None:
1017                 return tag_filter
1018             index += 1
1019             tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
1020
1021     def filter_data(self, element, params=None, data_set="tests",
1022                     continue_on_error=False):
1023         """Filter required data from the given jobs and builds.
1024
1025         The output data structure is:
1026
1027         - job 1
1028           - build 1
1029             - test (suite) 1 ID:
1030               - param 1
1031               - param 2
1032               ...
1033               - param n
1034             ...
1035             - test (suite) n ID:
1036             ...
1037           ...
1038           - build n
1039         ...
1040         - job n
1041
1042         :param element: Element which will use the filtered data.
1043         :param params: Parameters which will be included in the output. If None,
1044         all parameters are included.
1045         :param data_set: The set of data to be filtered: tests, suites,
1046         metadata.
1047         :param continue_on_error: Continue if there is error while reading the
1048         data. The Item will be empty then
1049         :type element: pandas.Series
1050         :type params: list
1051         :type data_set: str
1052         :type continue_on_error: bool
1053         :returns: Filtered data.
1054         :rtype pandas.Series
1055         """
1056
1057         try:
1058             if element["filter"] in ("all", "template"):
1059                 cond = "True"
1060             else:
1061                 cond = InputData._condition(element["filter"])
1062             logging.debug("   Filter: {0}".format(cond))
1063         except KeyError:
1064             logging.error("  No filter defined.")
1065             return None
1066
1067         if params is None:
1068             params = element.get("parameters", None)
1069
1070         data = pd.Series()
1071         try:
1072             for job, builds in element["data"].items():
1073                 data[job] = pd.Series()
1074                 for build in builds:
1075                     data[job][str(build)] = pd.Series()
1076                     try:
1077                         data_iter = self.data[job][str(build)][data_set].\
1078                             iteritems()
1079                     except KeyError:
1080                         if continue_on_error:
1081                             continue
1082                         else:
1083                             return None
1084                     for test_ID, test_data in data_iter:
1085                         if eval(cond, {"tags": test_data.get("tags", "")}):
1086                             data[job][str(build)][test_ID] = pd.Series()
1087                             if params is None:
1088                                 for param, val in test_data.items():
1089                                     data[job][str(build)][test_ID][param] = val
1090                             else:
1091                                 for param in params:
1092                                     try:
1093                                         data[job][str(build)][test_ID][param] =\
1094                                             test_data[param]
1095                                     except KeyError:
1096                                         data[job][str(build)][test_ID][param] =\
1097                                             "No Data"
1098             return data
1099
1100         except (KeyError, IndexError, ValueError) as err:
1101             logging.error("   Missing mandatory parameter in the element "
1102                           "specification: {0}".format(err))
1103             return None
1104         except AttributeError:
1105             return None
1106         except SyntaxError:
1107             logging.error("   The filter '{0}' is not correct. Check if all "
1108                           "tags are enclosed by apostrophes.".format(cond))
1109             return None
1110
1111     @staticmethod
1112     def merge_data(data):
1113         """Merge data from more jobs and builds to a simple data structure.
1114
1115         The output data structure is:
1116
1117         - test (suite) 1 ID:
1118           - param 1
1119           - param 2
1120           ...
1121           - param n
1122         ...
1123         - test (suite) n ID:
1124         ...
1125
1126         :param data: Data to merge.
1127         :type data: pandas.Series
1128         :returns: Merged data.
1129         :rtype: pandas.Series
1130         """
1131
1132         logging.info("    Merging data ...")
1133
1134         merged_data = pd.Series()
1135         for _, builds in data.iteritems():
1136             for _, item in builds.iteritems():
1137                 for ID, item_data in item.iteritems():
1138                     merged_data[ID] = item_data
1139
1140         return merged_data