CSIT-1183: Process the new and the old TC names
[csit.git] / resources / tools / presentation / input_data_parser.py
1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Data pre-processing
15
16 - extract data from output.xml files generated by Jenkins jobs and store in
17   pandas' Series,
18 - provide access to the data.
19 - filter the data using tags,
20 """
21
22 import multiprocessing
23 import os
24 import re
25 import pandas as pd
26 import logging
27
28 from robot.api import ExecutionResult, ResultVisitor
29 from robot import errors
30 from collections import OrderedDict
31 from string import replace
32 from os import remove
33 from jumpavg.AvgStdevMetadataFactory import AvgStdevMetadataFactory
34
35 from input_data_files import download_and_unzip_data_file
36 from utils import Worker
37
38
39 class ExecutionChecker(ResultVisitor):
40     """Class to traverse through the test suite structure.
41
42     The functionality implemented in this class generates a json structure:
43
44     Performance tests:
45
46     {
47         "metadata": {
48             "generated": "Timestamp",
49             "version": "SUT version",
50             "job": "Jenkins job name",
51             "build": "Information about the build"
52         },
53         "suites": {
54             "Suite long name 1": {
55                 "name": Suite name,
56                 "doc": "Suite 1 documentation",
57                 "parent": "Suite 1 parent",
58                 "level": "Level of the suite in the suite hierarchy"
59             }
60             "Suite long name N": {
61                 "name": Suite name,
62                 "doc": "Suite N documentation",
63                 "parent": "Suite 2 parent",
64                 "level": "Level of the suite in the suite hierarchy"
65             }
66         }
67         "tests": {
68             "ID": {
69                 "name": "Test name",
70                 "parent": "Name of the parent of the test",
71                 "doc": "Test documentation"
72                 "msg": "Test message"
73                 "tags": ["tag 1", "tag 2", "tag n"],
74                 "type": "PDR" | "NDR" | "TCP" | "MRR" | "BMRR",
75                 "throughput": {  # Only type: "PDR" | "NDR"
76                     "value": int,
77                     "unit": "pps" | "bps" | "percentage"
78                 },
79                 "latency": {  # Only type: "PDR" | "NDR"
80                     "direction1": {
81                         "100": {
82                             "min": int,
83                             "avg": int,
84                             "max": int
85                         },
86                         "50": {  # Only for NDR
87                             "min": int,
88                             "avg": int,
89                             "max": int
90                         },
91                         "10": {  # Only for NDR
92                             "min": int,
93                             "avg": int,
94                             "max": int
95                         }
96                     },
97                     "direction2": {
98                         "100": {
99                             "min": int,
100                             "avg": int,
101                             "max": int
102                         },
103                         "50": {  # Only for NDR
104                             "min": int,
105                             "avg": int,
106                             "max": int
107                         },
108                         "10": {  # Only for NDR
109                             "min": int,
110                             "avg": int,
111                             "max": int
112                         }
113                     }
114                 },
115                 "result": {  # Only type: "TCP"
116                     "value": int,
117                     "unit": "cps" | "rps"
118                 },
119                 "result": {  # Only type: "MRR" | "BMRR"
120                     "receive-rate": AvgStdevMetadata,
121                 },
122                 "lossTolerance": "lossTolerance",  # Only type: "PDR"
123                 "vat-history": "DUT1 and DUT2 VAT History"
124                 "show-run": "Show Run"
125             },
126             "ID" {
127                 # next test
128             }
129         }
130     }
131
132
133     Functional tests:
134
135     {
136         "metadata": {  # Optional
137             "version": "VPP version",
138             "job": "Jenkins job name",
139             "build": "Information about the build"
140         },
141         "suites": {
142             "Suite name 1": {
143                 "doc": "Suite 1 documentation",
144                 "parent": "Suite 1 parent",
145                 "level": "Level of the suite in the suite hierarchy"
146             }
147             "Suite name N": {
148                 "doc": "Suite N documentation",
149                 "parent": "Suite 2 parent",
150                 "level": "Level of the suite in the suite hierarchy"
151             }
152         }
153         "tests": {
154             "ID": {
155                 "name": "Test name",
156                 "parent": "Name of the parent of the test",
157                 "doc": "Test documentation"
158                 "msg": "Test message"
159                 "tags": ["tag 1", "tag 2", "tag n"],
160                 "vat-history": "DUT1 and DUT2 VAT History"
161                 "show-run": "Show Run"
162                 "status": "PASS" | "FAIL"
163             },
164             "ID" {
165                 # next test
166             }
167         }
168     }
169
170     .. note:: ID is the lowercase full path to the test.
171     """
172
173     REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
174
175     REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
176                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
177                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
178                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
179                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
180                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
181                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
182
183     REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
184                                r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
185                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
186
187     REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
188                                  r'[\D\d]*')
189
190     REGEX_VERSION_VPP = re.compile(r"(return STDOUT Version:\s*)(.*)")
191
192     REGEX_VERSION_DPDK = re.compile(r"(return STDOUT testpmd)([\d\D\n]*)"
193                                     r"(RTE Version: 'DPDK )(.*)(')")
194
195     REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
196
197     REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
198                            r'tx\s(\d*),\srx\s(\d*)')
199
200     REGEX_BMRR = re.compile(r'Maximum Receive Rate Results \[(.*)\]')
201
202     REGEX_TC_TAG = re.compile(r'\d+[tT]\d+[cC]')
203
204     REGEX_TC_NAME_OLD = re.compile(r'-\d+[tT]\d+[cC]-')
205
206     REGEX_TC_NAME_NEW = re.compile(r'-\d+[cC]-')
207
208     def __init__(self, metadata):
209         """Initialisation.
210
211         :param metadata: Key-value pairs to be included in "metadata" part of
212         JSON structure.
213         :type metadata: dict
214         """
215
216         # Type of message to parse out from the test messages
217         self._msg_type = None
218
219         # VPP version
220         self._version = None
221
222         # Timestamp
223         self._timestamp = None
224
225         # Number of VAT History messages found:
226         # 0 - no message
227         # 1 - VAT History of DUT1
228         # 2 - VAT History of DUT2
229         self._lookup_kw_nr = 0
230         self._vat_history_lookup_nr = 0
231
232         # Number of Show Running messages found
233         # 0 - no message
234         # 1 - Show run message found
235         self._show_run_lookup_nr = 0
236
237         # Test ID of currently processed test- the lowercase full path to the
238         # test
239         self._test_ID = None
240
241         # The main data structure
242         self._data = {
243             "metadata": OrderedDict(),
244             "suites": OrderedDict(),
245             "tests": OrderedDict()
246         }
247
248         # Save the provided metadata
249         for key, val in metadata.items():
250             self._data["metadata"][key] = val
251
252         # Dictionary defining the methods used to parse different types of
253         # messages
254         self.parse_msg = {
255             "timestamp": self._get_timestamp,
256             "vpp-version": self._get_vpp_version,
257             "dpdk-version": self._get_dpdk_version,
258             "teardown-vat-history": self._get_vat_history,
259             "test-show-runtime": self._get_show_run
260         }
261
262     @property
263     def data(self):
264         """Getter - Data parsed from the XML file.
265
266         :returns: Data parsed from the XML file.
267         :rtype: dict
268         """
269         return self._data
270
271     def _get_vpp_version(self, msg):
272         """Called when extraction of VPP version is required.
273
274         :param msg: Message to process.
275         :type msg: Message
276         :returns: Nothing.
277         """
278
279         if msg.message.count("return STDOUT Version:"):
280             self._version = str(re.search(self.REGEX_VERSION_VPP, msg.message).
281                                 group(2))
282             self._data["metadata"]["version"] = self._version
283             self._msg_type = None
284
285     def _get_dpdk_version(self, msg):
286         """Called when extraction of DPDK version is required.
287
288         :param msg: Message to process.
289         :type msg: Message
290         :returns: Nothing.
291         """
292
293         if msg.message.count("return STDOUT testpmd"):
294             try:
295                 self._version = str(re.search(
296                     self.REGEX_VERSION_DPDK, msg.message). group(4))
297                 self._data["metadata"]["version"] = self._version
298             except IndexError:
299                 pass
300             finally:
301                 self._msg_type = None
302
303     def _get_timestamp(self, msg):
304         """Called when extraction of timestamp is required.
305
306         :param msg: Message to process.
307         :type msg: Message
308         :returns: Nothing.
309         """
310
311         self._timestamp = msg.timestamp[:14]
312         self._data["metadata"]["generated"] = self._timestamp
313         self._msg_type = None
314
315     def _get_vat_history(self, msg):
316         """Called when extraction of VAT command history is required.
317
318         :param msg: Message to process.
319         :type msg: Message
320         :returns: Nothing.
321         """
322         if msg.message.count("VAT command history:"):
323             self._vat_history_lookup_nr += 1
324             if self._vat_history_lookup_nr == 1:
325                 self._data["tests"][self._test_ID]["vat-history"] = str()
326             else:
327                 self._msg_type = None
328             text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
329                           "VAT command history:", "", msg.message, count=1). \
330                 replace("\n\n", "\n").replace('\n', ' |br| ').\
331                 replace('\r', '').replace('"', "'")
332
333             self._data["tests"][self._test_ID]["vat-history"] += " |br| "
334             self._data["tests"][self._test_ID]["vat-history"] += \
335                 "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
336
337     def _get_show_run(self, msg):
338         """Called when extraction of VPP operational data (output of CLI command
339         Show Runtime) is required.
340
341         :param msg: Message to process.
342         :type msg: Message
343         :returns: Nothing.
344         """
345         if msg.message.count("return STDOUT Thread "):
346             self._show_run_lookup_nr += 1
347             if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
348                 self._data["tests"][self._test_ID]["show-run"] = str()
349             if self._lookup_kw_nr > 1:
350                 self._msg_type = None
351             if self._show_run_lookup_nr == 1:
352                 text = msg.message.replace("vat# ", "").\
353                     replace("return STDOUT ", "").replace("\n\n", "\n").\
354                     replace('\n', ' |br| ').\
355                     replace('\r', '').replace('"', "'")
356                 try:
357                     self._data["tests"][self._test_ID]["show-run"] += " |br| "
358                     self._data["tests"][self._test_ID]["show-run"] += \
359                         "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
360                 except KeyError:
361                     pass
362
363     def _get_latency(self, msg, test_type):
364         """Get the latency data from the test message.
365
366         :param msg: Message to be parsed.
367         :param test_type: Type of the test - NDR or PDR.
368         :type msg: str
369         :type test_type: str
370         :returns: Latencies parsed from the message.
371         :rtype: dict
372         """
373
374         if test_type == "NDR":
375             groups = re.search(self.REGEX_LAT_NDR, msg)
376             groups_range = range(1, 7)
377         elif test_type == "PDR":
378             groups = re.search(self.REGEX_LAT_PDR, msg)
379             groups_range = range(1, 3)
380         else:
381             return {}
382
383         latencies = list()
384         for idx in groups_range:
385             try:
386                 lat = [int(item) for item in str(groups.group(idx)).split('/')]
387             except (AttributeError, ValueError):
388                 lat = [-1, -1, -1]
389             latencies.append(lat)
390
391         keys = ("min", "avg", "max")
392         latency = {
393             "direction1": {
394             },
395             "direction2": {
396             }
397         }
398
399         latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
400         latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
401         if test_type == "NDR":
402             latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
403             latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
404             latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
405             latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
406
407         return latency
408
409     def visit_suite(self, suite):
410         """Implements traversing through the suite and its direct children.
411
412         :param suite: Suite to process.
413         :type suite: Suite
414         :returns: Nothing.
415         """
416         if self.start_suite(suite) is not False:
417             suite.suites.visit(self)
418             suite.tests.visit(self)
419             self.end_suite(suite)
420
421     def start_suite(self, suite):
422         """Called when suite starts.
423
424         :param suite: Suite to process.
425         :type suite: Suite
426         :returns: Nothing.
427         """
428
429         try:
430             parent_name = suite.parent.name
431         except AttributeError:
432             return
433
434         doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
435             replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
436         doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
437
438         self._data["suites"][suite.longname.lower().replace('"', "'").
439             replace(" ", "_")] = {
440                 "name": suite.name.lower(),
441                 "doc": doc_str,
442                 "parent": parent_name,
443                 "level": len(suite.longname.split("."))
444             }
445
446         suite.keywords.visit(self)
447
448     def end_suite(self, suite):
449         """Called when suite ends.
450
451         :param suite: Suite to process.
452         :type suite: Suite
453         :returns: Nothing.
454         """
455         pass
456
457     def visit_test(self, test):
458         """Implements traversing through the test.
459
460         :param test: Test to process.
461         :type test: Test
462         :returns: Nothing.
463         """
464         if self.start_test(test) is not False:
465             test.keywords.visit(self)
466             self.end_test(test)
467
468     def start_test(self, test):
469         """Called when test starts.
470
471         :param test: Test to process.
472         :type test: Test
473         :returns: Nothing.
474         """
475
476         tags = [str(tag) for tag in test.tags]
477         test_result = dict()
478         test_result["name"] = test.name.lower()
479         test_result["parent"] = test.parent.name.lower()
480         test_result["tags"] = tags
481         doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
482             replace('\r', '').replace('[', ' |br| [')
483         test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
484         test_result["msg"] = test.message.replace('\n', ' |br| '). \
485             replace('\r', '').replace('"', "'")
486         test_result["status"] = test.status
487         self._test_ID = test.longname.lower()
488         if test.status == "PASS" and ("NDRPDRDISC" in tags or
489                                       "TCP" in tags or
490                                       "MRR" in tags or
491                                       "BMRR" in tags):
492             if "NDRDISC" in tags:
493                 test_type = "NDR"
494             elif "PDRDISC" in tags:
495                 test_type = "PDR"
496             elif "TCP" in tags:
497                 test_type = "TCP"
498             elif "MRR" in tags:
499                 test_type = "MRR"
500             elif "FRMOBL" in tags or "BMRR" in tags:
501                 test_type = "BMRR"
502             else:
503                 return
504
505             test_result["type"] = test_type
506
507             # Replace info about cores (e.g. -1c-) with the info about threads
508             # and cores (e.g. -1t1c-) in the long test case names and in the
509             # test case names if necessary.
510             groups = re.search(self.REGEX_TC_NAME_OLD, self._test_ID)
511             if not groups:
512                 tag_count = 0
513                 for tag in test_result["tags"]:
514                     groups = re.search(self.REGEX_TC_TAG, tag)
515                     if groups:
516                         tag_count += 1
517                         tag_tc = tag
518
519                 if tag_count == 1:
520                     self._test_ID = re.sub(self.REGEX_TC_NAME_NEW,
521                                            "-{0}-".format(tag_tc.lower()),
522                                            self._test_ID,
523                                            count=1)
524                     test_result["name"] = re.sub(self.REGEX_TC_NAME_NEW,
525                                                  "-{0}-".format(tag_tc.lower()),
526                                                  test_result["name"],
527                                                  count=1)
528                 else:
529                     test_result["status"] = "FAIL"
530                     logging.error("The test '{0}' has no or more than one "
531                                   "multi-threading tags.".format(self._test_ID))
532                     return
533
534             if test_type in ("NDR", "PDR"):
535                 try:
536                     rate_value = str(re.search(
537                         self.REGEX_RATE, test.message).group(1))
538                 except AttributeError:
539                     rate_value = "-1"
540                 try:
541                     rate_unit = str(re.search(
542                         self.REGEX_RATE, test.message).group(2))
543                 except AttributeError:
544                     rate_unit = "-1"
545
546                 test_result["throughput"] = dict()
547                 test_result["throughput"]["value"] = \
548                     int(rate_value.split('.')[0])
549                 test_result["throughput"]["unit"] = rate_unit
550                 test_result["latency"] = \
551                     self._get_latency(test.message, test_type)
552                 if test_type == "PDR":
553                     test_result["lossTolerance"] = str(re.search(
554                         self.REGEX_TOLERANCE, test.message).group(1))
555
556             elif test_type in ("TCP", ):
557                 groups = re.search(self.REGEX_TCP, test.message)
558                 test_result["result"] = dict()
559                 test_result["result"]["value"] = int(groups.group(2))
560                 test_result["result"]["unit"] = groups.group(1)
561
562             elif test_type in ("MRR", "BMRR"):
563                 test_result["result"] = dict()
564                 groups = re.search(self.REGEX_BMRR, test.message)
565                 if groups is not None:
566                     items_str = groups.group(1)
567                     items_float = [float(item.strip()) for item
568                                    in items_str.split(",")]
569                     test_result["result"]["receive-rate"] = \
570                         AvgStdevMetadataFactory.from_data(items_float)
571                 else:
572                     groups = re.search(self.REGEX_MRR, test.message)
573                     test_result["result"]["receive-rate"] = \
574                         AvgStdevMetadataFactory.from_data([
575                             float(groups.group(3)) / float(groups.group(1)), ])
576
577         self._data["tests"][self._test_ID] = test_result
578
579     def end_test(self, test):
580         """Called when test ends.
581
582         :param test: Test to process.
583         :type test: Test
584         :returns: Nothing.
585         """
586         pass
587
588     def visit_keyword(self, keyword):
589         """Implements traversing through the keyword and its child keywords.
590
591         :param keyword: Keyword to process.
592         :type keyword: Keyword
593         :returns: Nothing.
594         """
595         if self.start_keyword(keyword) is not False:
596             self.end_keyword(keyword)
597
598     def start_keyword(self, keyword):
599         """Called when keyword starts. Default implementation does nothing.
600
601         :param keyword: Keyword to process.
602         :type keyword: Keyword
603         :returns: Nothing.
604         """
605         try:
606             if keyword.type == "setup":
607                 self.visit_setup_kw(keyword)
608             elif keyword.type == "teardown":
609                 self._lookup_kw_nr = 0
610                 self.visit_teardown_kw(keyword)
611             else:
612                 self._lookup_kw_nr = 0
613                 self.visit_test_kw(keyword)
614         except AttributeError:
615             pass
616
617     def end_keyword(self, keyword):
618         """Called when keyword ends. Default implementation does nothing.
619
620         :param keyword: Keyword to process.
621         :type keyword: Keyword
622         :returns: Nothing.
623         """
624         pass
625
626     def visit_test_kw(self, test_kw):
627         """Implements traversing through the test keyword and its child
628         keywords.
629
630         :param test_kw: Keyword to process.
631         :type test_kw: Keyword
632         :returns: Nothing.
633         """
634         for keyword in test_kw.keywords:
635             if self.start_test_kw(keyword) is not False:
636                 self.visit_test_kw(keyword)
637                 self.end_test_kw(keyword)
638
639     def start_test_kw(self, test_kw):
640         """Called when test keyword starts. Default implementation does
641         nothing.
642
643         :param test_kw: Keyword to process.
644         :type test_kw: Keyword
645         :returns: Nothing.
646         """
647         if test_kw.name.count("Show Runtime Counters On All Duts"):
648             self._lookup_kw_nr += 1
649             self._show_run_lookup_nr = 0
650             self._msg_type = "test-show-runtime"
651         elif test_kw.name.count("Start The L2fwd Test") and not self._version:
652             self._msg_type = "dpdk-version"
653         else:
654             return
655         test_kw.messages.visit(self)
656
657     def end_test_kw(self, test_kw):
658         """Called when keyword ends. Default implementation does nothing.
659
660         :param test_kw: Keyword to process.
661         :type test_kw: Keyword
662         :returns: Nothing.
663         """
664         pass
665
666     def visit_setup_kw(self, setup_kw):
667         """Implements traversing through the teardown keyword and its child
668         keywords.
669
670         :param setup_kw: Keyword to process.
671         :type setup_kw: Keyword
672         :returns: Nothing.
673         """
674         for keyword in setup_kw.keywords:
675             if self.start_setup_kw(keyword) is not False:
676                 self.visit_setup_kw(keyword)
677                 self.end_setup_kw(keyword)
678
679     def start_setup_kw(self, setup_kw):
680         """Called when teardown keyword starts. Default implementation does
681         nothing.
682
683         :param setup_kw: Keyword to process.
684         :type setup_kw: Keyword
685         :returns: Nothing.
686         """
687         if setup_kw.name.count("Show Vpp Version On All Duts") \
688                 and not self._version:
689             self._msg_type = "vpp-version"
690
691         elif setup_kw.name.count("Setup performance global Variables") \
692                 and not self._timestamp:
693             self._msg_type = "timestamp"
694         else:
695             return
696         setup_kw.messages.visit(self)
697
698     def end_setup_kw(self, setup_kw):
699         """Called when keyword ends. Default implementation does nothing.
700
701         :param setup_kw: Keyword to process.
702         :type setup_kw: Keyword
703         :returns: Nothing.
704         """
705         pass
706
707     def visit_teardown_kw(self, teardown_kw):
708         """Implements traversing through the teardown keyword and its child
709         keywords.
710
711         :param teardown_kw: Keyword to process.
712         :type teardown_kw: Keyword
713         :returns: Nothing.
714         """
715         for keyword in teardown_kw.keywords:
716             if self.start_teardown_kw(keyword) is not False:
717                 self.visit_teardown_kw(keyword)
718                 self.end_teardown_kw(keyword)
719
720     def start_teardown_kw(self, teardown_kw):
721         """Called when teardown keyword starts. Default implementation does
722         nothing.
723
724         :param teardown_kw: Keyword to process.
725         :type teardown_kw: Keyword
726         :returns: Nothing.
727         """
728
729         if teardown_kw.name.count("Show Vat History On All Duts"):
730             self._vat_history_lookup_nr = 0
731             self._msg_type = "teardown-vat-history"
732             teardown_kw.messages.visit(self)
733
734     def end_teardown_kw(self, teardown_kw):
735         """Called when keyword ends. Default implementation does nothing.
736
737         :param teardown_kw: Keyword to process.
738         :type teardown_kw: Keyword
739         :returns: Nothing.
740         """
741         pass
742
743     def visit_message(self, msg):
744         """Implements visiting the message.
745
746         :param msg: Message to process.
747         :type msg: Message
748         :returns: Nothing.
749         """
750         if self.start_message(msg) is not False:
751             self.end_message(msg)
752
753     def start_message(self, msg):
754         """Called when message starts. Get required information from messages:
755         - VPP version.
756
757         :param msg: Message to process.
758         :type msg: Message
759         :returns: Nothing.
760         """
761
762         if self._msg_type:
763             self.parse_msg[self._msg_type](msg)
764
765     def end_message(self, msg):
766         """Called when message ends. Default implementation does nothing.
767
768         :param msg: Message to process.
769         :type msg: Message
770         :returns: Nothing.
771         """
772         pass
773
774
775 class InputData(object):
776     """Input data
777
778     The data is extracted from output.xml files generated by Jenkins jobs and
779     stored in pandas' DataFrames.
780
781     The data structure:
782     - job name
783       - build number
784         - metadata
785           (as described in ExecutionChecker documentation)
786         - suites
787           (as described in ExecutionChecker documentation)
788         - tests
789           (as described in ExecutionChecker documentation)
790     """
791
792     def __init__(self, spec):
793         """Initialization.
794
795         :param spec: Specification.
796         :type spec: Specification
797         """
798
799         # Specification:
800         self._cfg = spec
801
802         # Data store:
803         self._input_data = pd.Series()
804
805     @property
806     def data(self):
807         """Getter - Input data.
808
809         :returns: Input data
810         :rtype: pandas.Series
811         """
812         return self._input_data
813
814     def metadata(self, job, build):
815         """Getter - metadata
816
817         :param job: Job which metadata we want.
818         :param build: Build which metadata we want.
819         :type job: str
820         :type build: str
821         :returns: Metadata
822         :rtype: pandas.Series
823         """
824
825         return self.data[job][build]["metadata"]
826
827     def suites(self, job, build):
828         """Getter - suites
829
830         :param job: Job which suites we want.
831         :param build: Build which suites we want.
832         :type job: str
833         :type build: str
834         :returns: Suites.
835         :rtype: pandas.Series
836         """
837
838         return self.data[job][str(build)]["suites"]
839
840     def tests(self, job, build):
841         """Getter - tests
842
843         :param job: Job which tests we want.
844         :param build: Build which tests we want.
845         :type job: str
846         :type build: str
847         :returns: Tests.
848         :rtype: pandas.Series
849         """
850
851         return self.data[job][build]["tests"]
852
853     @staticmethod
854     def _parse_tests(job, build, log):
855         """Process data from robot output.xml file and return JSON structured
856         data.
857
858         :param job: The name of job which build output data will be processed.
859         :param build: The build which output data will be processed.
860         :param log: List of log messages.
861         :type job: str
862         :type build: dict
863         :type log: list of tuples (severity, msg)
864         :returns: JSON data structure.
865         :rtype: dict
866         """
867
868         metadata = {
869             "job": job,
870             "build": build
871         }
872
873         with open(build["file-name"], 'r') as data_file:
874             try:
875                 result = ExecutionResult(data_file)
876             except errors.DataError as err:
877                 log.append(("ERROR", "Error occurred while parsing output.xml: "
878                                      "{0}".format(err)))
879                 return None
880         checker = ExecutionChecker(metadata)
881         result.visit(checker)
882
883         return checker.data
884
885     def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
886         """Download and parse the input data file.
887
888         :param pid: PID of the process executing this method.
889         :param data_queue: Shared memory between processes. Queue which keeps
890             the result data. This data is then read by the main process and used
891             in further processing.
892         :param job: Name of the Jenkins job which generated the processed input
893             file.
894         :param build: Information about the Jenkins build which generated the
895             processed input file.
896         :param repeat: Repeat the download specified number of times if not
897             successful.
898         :type pid: int
899         :type data_queue: multiprocessing.Manager().Queue()
900         :type job: str
901         :type build: dict
902         :type repeat: int
903         """
904
905         logs = list()
906
907         logging.info("  Processing the job/build: {0}: {1}".
908                      format(job, build["build"]))
909
910         logs.append(("INFO", "  Processing the job/build: {0}: {1}".
911                      format(job, build["build"])))
912
913         state = "failed"
914         success = False
915         data = None
916         do_repeat = repeat
917         while do_repeat:
918             success = download_and_unzip_data_file(self._cfg, job, build, pid,
919                                                    logs)
920             if success:
921                 break
922             do_repeat -= 1
923         if not success:
924             logs.append(("ERROR", "It is not possible to download the input "
925                                   "data file from the job '{job}', build "
926                                   "'{build}', or it is damaged. Skipped.".
927                          format(job=job, build=build["build"])))
928         if success:
929             logs.append(("INFO", "  Processing data from the build '{0}' ...".
930                          format(build["build"])))
931             data = InputData._parse_tests(job, build, logs)
932             if data is None:
933                 logs.append(("ERROR", "Input data file from the job '{job}', "
934                                       "build '{build}' is damaged. Skipped.".
935                              format(job=job, build=build["build"])))
936             else:
937                 state = "processed"
938
939             try:
940                 remove(build["file-name"])
941             except OSError as err:
942                 logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
943                              format(build["file-name"], err)))
944         logs.append(("INFO", "  Done."))
945
946         result = {
947             "data": data,
948             "state": state,
949             "job": job,
950             "build": build,
951             "logs": logs
952         }
953         data_queue.put(result)
954
955     def download_and_parse_data(self, repeat=1):
956         """Download the input data files, parse input data from input files and
957         store in pandas' Series.
958
959         :param repeat: Repeat the download specified number of times if not
960             successful.
961         :type repeat: int
962         """
963
964         logging.info("Downloading and parsing input files ...")
965
966         work_queue = multiprocessing.JoinableQueue()
967         manager = multiprocessing.Manager()
968         data_queue = manager.Queue()
969         cpus = multiprocessing.cpu_count()
970
971         workers = list()
972         for cpu in range(cpus):
973             worker = Worker(work_queue,
974                             data_queue,
975                             self._download_and_parse_build)
976             worker.daemon = True
977             worker.start()
978             workers.append(worker)
979             os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
980                       format(cpu, worker.pid))
981
982         for job, builds in self._cfg.builds.items():
983             for build in builds:
984                 work_queue.put((job, build, repeat))
985
986         work_queue.join()
987
988         logging.info("Done.")
989
990         while not data_queue.empty():
991             result = data_queue.get()
992
993             job = result["job"]
994             build_nr = result["build"]["build"]
995
996             if result["data"]:
997                 data = result["data"]
998                 build_data = pd.Series({
999                     "metadata": pd.Series(data["metadata"].values(),
1000                                           index=data["metadata"].keys()),
1001                     "suites": pd.Series(data["suites"].values(),
1002                                         index=data["suites"].keys()),
1003                     "tests": pd.Series(data["tests"].values(),
1004                                        index=data["tests"].keys())})
1005
1006                 if self._input_data.get(job, None) is None:
1007                     self._input_data[job] = pd.Series()
1008                 self._input_data[job][str(build_nr)] = build_data
1009
1010                 self._cfg.set_input_file_name(job, build_nr,
1011                                               result["build"]["file-name"])
1012
1013             self._cfg.set_input_state(job, build_nr, result["state"])
1014
1015             for item in result["logs"]:
1016                 if item[0] == "INFO":
1017                     logging.info(item[1])
1018                 elif item[0] == "ERROR":
1019                     logging.error(item[1])
1020                 elif item[0] == "DEBUG":
1021                     logging.debug(item[1])
1022                 elif item[0] == "CRITICAL":
1023                     logging.critical(item[1])
1024                 elif item[0] == "WARNING":
1025                     logging.warning(item[1])
1026
1027         del data_queue
1028
1029         # Terminate all workers
1030         for worker in workers:
1031             worker.terminate()
1032             worker.join()
1033
1034         logging.info("Done.")
1035
1036     @staticmethod
1037     def _end_of_tag(tag_filter, start=0, closer="'"):
1038         """Return the index of character in the string which is the end of tag.
1039
1040         :param tag_filter: The string where the end of tag is being searched.
1041         :param start: The index where the searching is stated.
1042         :param closer: The character which is the tag closer.
1043         :type tag_filter: str
1044         :type start: int
1045         :type closer: str
1046         :returns: The index of the tag closer.
1047         :rtype: int
1048         """
1049
1050         try:
1051             idx_opener = tag_filter.index(closer, start)
1052             return tag_filter.index(closer, idx_opener + 1)
1053         except ValueError:
1054             return None
1055
1056     @staticmethod
1057     def _condition(tag_filter):
1058         """Create a conditional statement from the given tag filter.
1059
1060         :param tag_filter: Filter based on tags from the element specification.
1061         :type tag_filter: str
1062         :returns: Conditional statement which can be evaluated.
1063         :rtype: str
1064         """
1065
1066         index = 0
1067         while True:
1068             index = InputData._end_of_tag(tag_filter, index)
1069             if index is None:
1070                 return tag_filter
1071             index += 1
1072             tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
1073
1074     def filter_data(self, element, params=None, data_set="tests",
1075                     continue_on_error=False):
1076         """Filter required data from the given jobs and builds.
1077
1078         The output data structure is:
1079
1080         - job 1
1081           - build 1
1082             - test (or suite) 1 ID:
1083               - param 1
1084               - param 2
1085               ...
1086               - param n
1087             ...
1088             - test (or suite) n ID:
1089             ...
1090           ...
1091           - build n
1092         ...
1093         - job n
1094
1095         :param element: Element which will use the filtered data.
1096         :param params: Parameters which will be included in the output. If None,
1097         all parameters are included.
1098         :param data_set: The set of data to be filtered: tests, suites,
1099         metadata.
1100         :param continue_on_error: Continue if there is error while reading the
1101         data. The Item will be empty then
1102         :type element: pandas.Series
1103         :type params: list
1104         :type data_set: str
1105         :type continue_on_error: bool
1106         :returns: Filtered data.
1107         :rtype pandas.Series
1108         """
1109
1110         try:
1111             if element["filter"] in ("all", "template"):
1112                 cond = "True"
1113             else:
1114                 cond = InputData._condition(element["filter"])
1115             logging.debug("   Filter: {0}".format(cond))
1116         except KeyError:
1117             logging.error("  No filter defined.")
1118             return None
1119
1120         if params is None:
1121             params = element.get("parameters", None)
1122
1123         data = pd.Series()
1124         try:
1125             for job, builds in element["data"].items():
1126                 data[job] = pd.Series()
1127                 for build in builds:
1128                     data[job][str(build)] = pd.Series()
1129                     try:
1130                         data_iter = self.data[job][str(build)][data_set].\
1131                             iteritems()
1132                     except KeyError:
1133                         if continue_on_error:
1134                             continue
1135                         else:
1136                             return None
1137                     for test_ID, test_data in data_iter:
1138                         if eval(cond, {"tags": test_data.get("tags", "")}):
1139                             data[job][str(build)][test_ID] = pd.Series()
1140                             if params is None:
1141                                 for param, val in test_data.items():
1142                                     data[job][str(build)][test_ID][param] = val
1143                             else:
1144                                 for param in params:
1145                                     try:
1146                                         data[job][str(build)][test_ID][param] =\
1147                                             test_data[param]
1148                                     except KeyError:
1149                                         data[job][str(build)][test_ID][param] =\
1150                                             "No Data"
1151             return data
1152
1153         except (KeyError, IndexError, ValueError) as err:
1154             logging.error("   Missing mandatory parameter in the element "
1155                           "specification: {0}".format(err))
1156             return None
1157         except AttributeError:
1158             return None
1159         except SyntaxError:
1160             logging.error("   The filter '{0}' is not correct. Check if all "
1161                           "tags are enclosed by apostrophes.".format(cond))
1162             return None
1163
1164     @staticmethod
1165     def merge_data(data):
1166         """Merge data from more jobs and builds to a simple data structure.
1167
1168         The output data structure is:
1169
1170         - test (suite) 1 ID:
1171           - param 1
1172           - param 2
1173           ...
1174           - param n
1175         ...
1176         - test (suite) n ID:
1177         ...
1178
1179         :param data: Data to merge.
1180         :type data: pandas.Series
1181         :returns: Merged data.
1182         :rtype: pandas.Series
1183         """
1184
1185         logging.info("    Merging data ...")
1186
1187         merged_data = pd.Series()
1188         for _, builds in data.iteritems():
1189             for _, item in builds.iteritems():
1190                 for ID, item_data in item.iteritems():
1191                     merged_data[ID] = item_data
1192
1193         return merged_data