CSIT-1178: Prepare for bursty MRR
[csit.git] / resources / tools / presentation / input_data_parser.py
1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Data pre-processing
15
16 - extract data from output.xml files generated by Jenkins jobs and store in
17   pandas' Series,
18 - provide access to the data.
19 - filter the data using tags,
20 """
21
22 import multiprocessing
23 import os
24 import re
25 import pandas as pd
26 import logging
27
28 from robot.api import ExecutionResult, ResultVisitor
29 from robot import errors
30 from collections import OrderedDict
31 from string import replace
32 from os import remove
33 from jumpavg.AvgStdevMetadataFactory import AvgStdevMetadataFactory
34
35 from input_data_files import download_and_unzip_data_file
36 from utils import Worker
37
38
39 class ExecutionChecker(ResultVisitor):
40     """Class to traverse through the test suite structure.
41
42     The functionality implemented in this class generates a json structure:
43
44     Performance tests:
45
46     {
47         "metadata": {
48             "generated": "Timestamp",
49             "version": "SUT version",
50             "job": "Jenkins job name",
51             "build": "Information about the build"
52         },
53         "suites": {
54             "Suite long name 1": {
55                 "name": Suite name,
56                 "doc": "Suite 1 documentation",
57                 "parent": "Suite 1 parent",
58                 "level": "Level of the suite in the suite hierarchy"
59             }
60             "Suite long name N": {
61                 "name": Suite name,
62                 "doc": "Suite N documentation",
63                 "parent": "Suite 2 parent",
64                 "level": "Level of the suite in the suite hierarchy"
65             }
66         }
67         "tests": {
68             "ID": {
69                 "name": "Test name",
70                 "parent": "Name of the parent of the test",
71                 "doc": "Test documentation"
72                 "msg": "Test message"
73                 "tags": ["tag 1", "tag 2", "tag n"],
74                 "type": "PDR" | "NDR" | "TCP" | "MRR" | "BMRR",
75                 "throughput": {  # Only type: "PDR" | "NDR"
76                     "value": int,
77                     "unit": "pps" | "bps" | "percentage"
78                 },
79                 "latency": {  # Only type: "PDR" | "NDR"
80                     "direction1": {
81                         "100": {
82                             "min": int,
83                             "avg": int,
84                             "max": int
85                         },
86                         "50": {  # Only for NDR
87                             "min": int,
88                             "avg": int,
89                             "max": int
90                         },
91                         "10": {  # Only for NDR
92                             "min": int,
93                             "avg": int,
94                             "max": int
95                         }
96                     },
97                     "direction2": {
98                         "100": {
99                             "min": int,
100                             "avg": int,
101                             "max": int
102                         },
103                         "50": {  # Only for NDR
104                             "min": int,
105                             "avg": int,
106                             "max": int
107                         },
108                         "10": {  # Only for NDR
109                             "min": int,
110                             "avg": int,
111                             "max": int
112                         }
113                     }
114                 },
115                 "result": {  # Only type: "TCP"
116                     "value": int,
117                     "unit": "cps" | "rps"
118                 },
119                 "result": {  # Only type: "MRR" | "BMRR"
120                     "receive-rate": AvgStdevMetadata,
121                 },
122                 "lossTolerance": "lossTolerance",  # Only type: "PDR"
123                 "vat-history": "DUT1 and DUT2 VAT History"
124                 "show-run": "Show Run"
125             },
126             "ID" {
127                 # next test
128             }
129         }
130     }
131
132
133     Functional tests:
134
135     {
136         "metadata": {  # Optional
137             "version": "VPP version",
138             "job": "Jenkins job name",
139             "build": "Information about the build"
140         },
141         "suites": {
142             "Suite name 1": {
143                 "doc": "Suite 1 documentation",
144                 "parent": "Suite 1 parent",
145                 "level": "Level of the suite in the suite hierarchy"
146             }
147             "Suite name N": {
148                 "doc": "Suite N documentation",
149                 "parent": "Suite 2 parent",
150                 "level": "Level of the suite in the suite hierarchy"
151             }
152         }
153         "tests": {
154             "ID": {
155                 "name": "Test name",
156                 "parent": "Name of the parent of the test",
157                 "doc": "Test documentation"
158                 "msg": "Test message"
159                 "tags": ["tag 1", "tag 2", "tag n"],
160                 "vat-history": "DUT1 and DUT2 VAT History"
161                 "show-run": "Show Run"
162                 "status": "PASS" | "FAIL"
163             },
164             "ID" {
165                 # next test
166             }
167         }
168     }
169
170     .. note:: ID is the lowercase full path to the test.
171     """
172
173     REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
174
175     REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
176                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
177                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
178                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
179                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
180                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
181                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
182
183     REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
184                                r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
185                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
186
187     REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
188                                  r'[\D\d]*')
189
190     REGEX_VERSION_VPP = re.compile(r"(return STDOUT Version:\s*)(.*)")
191
192     REGEX_VERSION_DPDK = re.compile(r"(return STDOUT testpmd)([\d\D\n]*)"
193                                     r"(RTE Version: 'DPDK )(.*)(')")
194
195     REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
196
197     REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
198                            r'tx\s(\d*),\srx\s(\d*)')
199
200     REGEX_BMRR = re.compile(r'Maximum Receive Rate trial results'
201                             r' in packets per second: \[(.*)\]')
202
203     REGEX_TC_TAG = re.compile(r'\d+[tT]\d+[cC]')
204
205     REGEX_TC_NAME_OLD = re.compile(r'-\d+[tT]\d+[cC]-')
206
207     REGEX_TC_NAME_NEW = re.compile(r'-\d+[cC]-')
208
209     def __init__(self, metadata):
210         """Initialisation.
211
212         :param metadata: Key-value pairs to be included in "metadata" part of
213         JSON structure.
214         :type metadata: dict
215         """
216
217         # Type of message to parse out from the test messages
218         self._msg_type = None
219
220         # VPP version
221         self._version = None
222
223         # Timestamp
224         self._timestamp = None
225
226         # Number of VAT History messages found:
227         # 0 - no message
228         # 1 - VAT History of DUT1
229         # 2 - VAT History of DUT2
230         self._lookup_kw_nr = 0
231         self._vat_history_lookup_nr = 0
232
233         # Number of Show Running messages found
234         # 0 - no message
235         # 1 - Show run message found
236         self._show_run_lookup_nr = 0
237
238         # Test ID of currently processed test- the lowercase full path to the
239         # test
240         self._test_ID = None
241
242         # The main data structure
243         self._data = {
244             "metadata": OrderedDict(),
245             "suites": OrderedDict(),
246             "tests": OrderedDict()
247         }
248
249         # Save the provided metadata
250         for key, val in metadata.items():
251             self._data["metadata"][key] = val
252
253         # Dictionary defining the methods used to parse different types of
254         # messages
255         self.parse_msg = {
256             "timestamp": self._get_timestamp,
257             "vpp-version": self._get_vpp_version,
258             "dpdk-version": self._get_dpdk_version,
259             "teardown-vat-history": self._get_vat_history,
260             "test-show-runtime": self._get_show_run
261         }
262
263     @property
264     def data(self):
265         """Getter - Data parsed from the XML file.
266
267         :returns: Data parsed from the XML file.
268         :rtype: dict
269         """
270         return self._data
271
272     def _get_vpp_version(self, msg):
273         """Called when extraction of VPP version is required.
274
275         :param msg: Message to process.
276         :type msg: Message
277         :returns: Nothing.
278         """
279
280         if msg.message.count("return STDOUT Version:"):
281             self._version = str(re.search(self.REGEX_VERSION_VPP, msg.message).
282                                 group(2))
283             self._data["metadata"]["version"] = self._version
284             self._msg_type = None
285
286     def _get_dpdk_version(self, msg):
287         """Called when extraction of DPDK version is required.
288
289         :param msg: Message to process.
290         :type msg: Message
291         :returns: Nothing.
292         """
293
294         if msg.message.count("return STDOUT testpmd"):
295             try:
296                 self._version = str(re.search(
297                     self.REGEX_VERSION_DPDK, msg.message). group(4))
298                 self._data["metadata"]["version"] = self._version
299             except IndexError:
300                 pass
301             finally:
302                 self._msg_type = None
303
304     def _get_timestamp(self, msg):
305         """Called when extraction of timestamp is required.
306
307         :param msg: Message to process.
308         :type msg: Message
309         :returns: Nothing.
310         """
311
312         self._timestamp = msg.timestamp[:14]
313         self._data["metadata"]["generated"] = self._timestamp
314         self._msg_type = None
315
316     def _get_vat_history(self, msg):
317         """Called when extraction of VAT command history is required.
318
319         :param msg: Message to process.
320         :type msg: Message
321         :returns: Nothing.
322         """
323         if msg.message.count("VAT command history:"):
324             self._vat_history_lookup_nr += 1
325             if self._vat_history_lookup_nr == 1:
326                 self._data["tests"][self._test_ID]["vat-history"] = str()
327             else:
328                 self._msg_type = None
329             text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
330                           "VAT command history:", "", msg.message, count=1). \
331                 replace("\n\n", "\n").replace('\n', ' |br| ').\
332                 replace('\r', '').replace('"', "'")
333
334             self._data["tests"][self._test_ID]["vat-history"] += " |br| "
335             self._data["tests"][self._test_ID]["vat-history"] += \
336                 "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
337
338     def _get_show_run(self, msg):
339         """Called when extraction of VPP operational data (output of CLI command
340         Show Runtime) is required.
341
342         :param msg: Message to process.
343         :type msg: Message
344         :returns: Nothing.
345         """
346         if msg.message.count("return STDOUT Thread "):
347             self._show_run_lookup_nr += 1
348             if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
349                 self._data["tests"][self._test_ID]["show-run"] = str()
350             if self._lookup_kw_nr > 1:
351                 self._msg_type = None
352             if self._show_run_lookup_nr == 1:
353                 text = msg.message.replace("vat# ", "").\
354                     replace("return STDOUT ", "").replace("\n\n", "\n").\
355                     replace('\n', ' |br| ').\
356                     replace('\r', '').replace('"', "'")
357                 try:
358                     self._data["tests"][self._test_ID]["show-run"] += " |br| "
359                     self._data["tests"][self._test_ID]["show-run"] += \
360                         "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
361                 except KeyError:
362                     pass
363
364     def _get_latency(self, msg, test_type):
365         """Get the latency data from the test message.
366
367         :param msg: Message to be parsed.
368         :param test_type: Type of the test - NDR or PDR.
369         :type msg: str
370         :type test_type: str
371         :returns: Latencies parsed from the message.
372         :rtype: dict
373         """
374
375         if test_type == "NDR":
376             groups = re.search(self.REGEX_LAT_NDR, msg)
377             groups_range = range(1, 7)
378         elif test_type == "PDR":
379             groups = re.search(self.REGEX_LAT_PDR, msg)
380             groups_range = range(1, 3)
381         else:
382             return {}
383
384         latencies = list()
385         for idx in groups_range:
386             try:
387                 lat = [int(item) for item in str(groups.group(idx)).split('/')]
388             except (AttributeError, ValueError):
389                 lat = [-1, -1, -1]
390             latencies.append(lat)
391
392         keys = ("min", "avg", "max")
393         latency = {
394             "direction1": {
395             },
396             "direction2": {
397             }
398         }
399
400         latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
401         latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
402         if test_type == "NDR":
403             latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
404             latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
405             latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
406             latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
407
408         return latency
409
410     def visit_suite(self, suite):
411         """Implements traversing through the suite and its direct children.
412
413         :param suite: Suite to process.
414         :type suite: Suite
415         :returns: Nothing.
416         """
417         if self.start_suite(suite) is not False:
418             suite.suites.visit(self)
419             suite.tests.visit(self)
420             self.end_suite(suite)
421
422     def start_suite(self, suite):
423         """Called when suite starts.
424
425         :param suite: Suite to process.
426         :type suite: Suite
427         :returns: Nothing.
428         """
429
430         try:
431             parent_name = suite.parent.name
432         except AttributeError:
433             return
434
435         doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
436             replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
437         doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
438
439         self._data["suites"][suite.longname.lower().replace('"', "'").
440             replace(" ", "_")] = {
441                 "name": suite.name.lower(),
442                 "doc": doc_str,
443                 "parent": parent_name,
444                 "level": len(suite.longname.split("."))
445             }
446
447         suite.keywords.visit(self)
448
449     def end_suite(self, suite):
450         """Called when suite ends.
451
452         :param suite: Suite to process.
453         :type suite: Suite
454         :returns: Nothing.
455         """
456         pass
457
458     def visit_test(self, test):
459         """Implements traversing through the test.
460
461         :param test: Test to process.
462         :type test: Test
463         :returns: Nothing.
464         """
465         if self.start_test(test) is not False:
466             test.keywords.visit(self)
467             self.end_test(test)
468
469     def start_test(self, test):
470         """Called when test starts.
471
472         :param test: Test to process.
473         :type test: Test
474         :returns: Nothing.
475         """
476
477         tags = [str(tag) for tag in test.tags]
478         test_result = dict()
479         test_result["name"] = test.name.lower()
480         test_result["parent"] = test.parent.name.lower()
481         test_result["tags"] = tags
482         doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
483             replace('\r', '').replace('[', ' |br| [')
484         test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
485         test_result["msg"] = test.message.replace('\n', ' |br| '). \
486             replace('\r', '').replace('"', "'")
487         test_result["status"] = test.status
488         self._test_ID = test.longname.lower()
489         if test.status == "PASS" and ("NDRPDRDISC" in tags or
490                                       "TCP" in tags or
491                                       "MRR" in tags or
492                                       "BMRR" in tags):
493             if "NDRDISC" in tags:
494                 test_type = "NDR"
495             elif "PDRDISC" in tags:
496                 test_type = "PDR"
497             elif "TCP" in tags:
498                 test_type = "TCP"
499             elif "MRR" in tags:
500                 test_type = "MRR"
501             elif "FRMOBL" in tags or "BMRR" in tags:
502                 test_type = "BMRR"
503             else:
504                 return
505
506             test_result["type"] = test_type
507
508             # Replace info about cores (e.g. -1c-) with the info about threads
509             # and cores (e.g. -1t1c-) in the long test case names and in the
510             # test case names if necessary.
511             groups = re.search(self.REGEX_TC_NAME_OLD, self._test_ID)
512             if not groups:
513                 tag_count = 0
514                 for tag in test_result["tags"]:
515                     groups = re.search(self.REGEX_TC_TAG, tag)
516                     if groups:
517                         tag_count += 1
518                         tag_tc = tag
519
520                 if tag_count == 1:
521                     self._test_ID = re.sub(self.REGEX_TC_NAME_NEW,
522                                            "-{0}-".format(tag_tc.lower()),
523                                            self._test_ID,
524                                            count=1)
525                     test_result["name"] = re.sub(self.REGEX_TC_NAME_NEW,
526                                                  "-{0}-".format(tag_tc.lower()),
527                                                  test_result["name"],
528                                                  count=1)
529                 else:
530                     test_result["status"] = "FAIL"
531                     logging.error("The test '{0}' has no or more than one "
532                                   "multi-threading tags.".format(self._test_ID))
533                     return
534
535             if test_type in ("NDR", "PDR"):
536                 try:
537                     rate_value = str(re.search(
538                         self.REGEX_RATE, test.message).group(1))
539                 except AttributeError:
540                     rate_value = "-1"
541                 try:
542                     rate_unit = str(re.search(
543                         self.REGEX_RATE, test.message).group(2))
544                 except AttributeError:
545                     rate_unit = "-1"
546
547                 test_result["throughput"] = dict()
548                 test_result["throughput"]["value"] = \
549                     int(rate_value.split('.')[0])
550                 test_result["throughput"]["unit"] = rate_unit
551                 test_result["latency"] = \
552                     self._get_latency(test.message, test_type)
553                 if test_type == "PDR":
554                     test_result["lossTolerance"] = str(re.search(
555                         self.REGEX_TOLERANCE, test.message).group(1))
556
557             elif test_type in ("TCP", ):
558                 groups = re.search(self.REGEX_TCP, test.message)
559                 test_result["result"] = dict()
560                 test_result["result"]["value"] = int(groups.group(2))
561                 test_result["result"]["unit"] = groups.group(1)
562
563             elif test_type in ("MRR", "BMRR"):
564                 test_result["result"] = dict()
565                 groups = re.search(self.REGEX_BMRR, test.message)
566                 if groups is not None:
567                     items_str = groups.group(1)
568                     items_float = [float(item.strip()) for item
569                                    in items_str.split(",")]
570                     test_result["result"]["receive-rate"] = \
571                         AvgStdevMetadataFactory.from_data(items_float)
572                 else:
573                     groups = re.search(self.REGEX_MRR, test.message)
574                     test_result["result"]["receive-rate"] = \
575                         AvgStdevMetadataFactory.from_data([
576                             float(groups.group(3)) / float(groups.group(1)), ])
577
578         self._data["tests"][self._test_ID] = test_result
579
580     def end_test(self, test):
581         """Called when test ends.
582
583         :param test: Test to process.
584         :type test: Test
585         :returns: Nothing.
586         """
587         pass
588
589     def visit_keyword(self, keyword):
590         """Implements traversing through the keyword and its child keywords.
591
592         :param keyword: Keyword to process.
593         :type keyword: Keyword
594         :returns: Nothing.
595         """
596         if self.start_keyword(keyword) is not False:
597             self.end_keyword(keyword)
598
599     def start_keyword(self, keyword):
600         """Called when keyword starts. Default implementation does nothing.
601
602         :param keyword: Keyword to process.
603         :type keyword: Keyword
604         :returns: Nothing.
605         """
606         try:
607             if keyword.type == "setup":
608                 self.visit_setup_kw(keyword)
609             elif keyword.type == "teardown":
610                 self._lookup_kw_nr = 0
611                 self.visit_teardown_kw(keyword)
612             else:
613                 self._lookup_kw_nr = 0
614                 self.visit_test_kw(keyword)
615         except AttributeError:
616             pass
617
618     def end_keyword(self, keyword):
619         """Called when keyword ends. Default implementation does nothing.
620
621         :param keyword: Keyword to process.
622         :type keyword: Keyword
623         :returns: Nothing.
624         """
625         pass
626
627     def visit_test_kw(self, test_kw):
628         """Implements traversing through the test keyword and its child
629         keywords.
630
631         :param test_kw: Keyword to process.
632         :type test_kw: Keyword
633         :returns: Nothing.
634         """
635         for keyword in test_kw.keywords:
636             if self.start_test_kw(keyword) is not False:
637                 self.visit_test_kw(keyword)
638                 self.end_test_kw(keyword)
639
640     def start_test_kw(self, test_kw):
641         """Called when test keyword starts. Default implementation does
642         nothing.
643
644         :param test_kw: Keyword to process.
645         :type test_kw: Keyword
646         :returns: Nothing.
647         """
648         if test_kw.name.count("Show Runtime Counters On All Duts"):
649             self._lookup_kw_nr += 1
650             self._show_run_lookup_nr = 0
651             self._msg_type = "test-show-runtime"
652         elif test_kw.name.count("Start The L2fwd Test") and not self._version:
653             self._msg_type = "dpdk-version"
654         else:
655             return
656         test_kw.messages.visit(self)
657
658     def end_test_kw(self, test_kw):
659         """Called when keyword ends. Default implementation does nothing.
660
661         :param test_kw: Keyword to process.
662         :type test_kw: Keyword
663         :returns: Nothing.
664         """
665         pass
666
667     def visit_setup_kw(self, setup_kw):
668         """Implements traversing through the teardown keyword and its child
669         keywords.
670
671         :param setup_kw: Keyword to process.
672         :type setup_kw: Keyword
673         :returns: Nothing.
674         """
675         for keyword in setup_kw.keywords:
676             if self.start_setup_kw(keyword) is not False:
677                 self.visit_setup_kw(keyword)
678                 self.end_setup_kw(keyword)
679
680     def start_setup_kw(self, setup_kw):
681         """Called when teardown keyword starts. Default implementation does
682         nothing.
683
684         :param setup_kw: Keyword to process.
685         :type setup_kw: Keyword
686         :returns: Nothing.
687         """
688         if setup_kw.name.count("Show Vpp Version On All Duts") \
689                 and not self._version:
690             self._msg_type = "vpp-version"
691
692         elif setup_kw.name.count("Setup performance global Variables") \
693                 and not self._timestamp:
694             self._msg_type = "timestamp"
695         else:
696             return
697         setup_kw.messages.visit(self)
698
699     def end_setup_kw(self, setup_kw):
700         """Called when keyword ends. Default implementation does nothing.
701
702         :param setup_kw: Keyword to process.
703         :type setup_kw: Keyword
704         :returns: Nothing.
705         """
706         pass
707
708     def visit_teardown_kw(self, teardown_kw):
709         """Implements traversing through the teardown keyword and its child
710         keywords.
711
712         :param teardown_kw: Keyword to process.
713         :type teardown_kw: Keyword
714         :returns: Nothing.
715         """
716         for keyword in teardown_kw.keywords:
717             if self.start_teardown_kw(keyword) is not False:
718                 self.visit_teardown_kw(keyword)
719                 self.end_teardown_kw(keyword)
720
721     def start_teardown_kw(self, teardown_kw):
722         """Called when teardown keyword starts. Default implementation does
723         nothing.
724
725         :param teardown_kw: Keyword to process.
726         :type teardown_kw: Keyword
727         :returns: Nothing.
728         """
729
730         if teardown_kw.name.count("Show Vat History On All Duts"):
731             self._vat_history_lookup_nr = 0
732             self._msg_type = "teardown-vat-history"
733             teardown_kw.messages.visit(self)
734
735     def end_teardown_kw(self, teardown_kw):
736         """Called when keyword ends. Default implementation does nothing.
737
738         :param teardown_kw: Keyword to process.
739         :type teardown_kw: Keyword
740         :returns: Nothing.
741         """
742         pass
743
744     def visit_message(self, msg):
745         """Implements visiting the message.
746
747         :param msg: Message to process.
748         :type msg: Message
749         :returns: Nothing.
750         """
751         if self.start_message(msg) is not False:
752             self.end_message(msg)
753
754     def start_message(self, msg):
755         """Called when message starts. Get required information from messages:
756         - VPP version.
757
758         :param msg: Message to process.
759         :type msg: Message
760         :returns: Nothing.
761         """
762
763         if self._msg_type:
764             self.parse_msg[self._msg_type](msg)
765
766     def end_message(self, msg):
767         """Called when message ends. Default implementation does nothing.
768
769         :param msg: Message to process.
770         :type msg: Message
771         :returns: Nothing.
772         """
773         pass
774
775
776 class InputData(object):
777     """Input data
778
779     The data is extracted from output.xml files generated by Jenkins jobs and
780     stored in pandas' DataFrames.
781
782     The data structure:
783     - job name
784       - build number
785         - metadata
786           (as described in ExecutionChecker documentation)
787         - suites
788           (as described in ExecutionChecker documentation)
789         - tests
790           (as described in ExecutionChecker documentation)
791     """
792
793     def __init__(self, spec):
794         """Initialization.
795
796         :param spec: Specification.
797         :type spec: Specification
798         """
799
800         # Specification:
801         self._cfg = spec
802
803         # Data store:
804         self._input_data = pd.Series()
805
806     @property
807     def data(self):
808         """Getter - Input data.
809
810         :returns: Input data
811         :rtype: pandas.Series
812         """
813         return self._input_data
814
815     def metadata(self, job, build):
816         """Getter - metadata
817
818         :param job: Job which metadata we want.
819         :param build: Build which metadata we want.
820         :type job: str
821         :type build: str
822         :returns: Metadata
823         :rtype: pandas.Series
824         """
825
826         return self.data[job][build]["metadata"]
827
828     def suites(self, job, build):
829         """Getter - suites
830
831         :param job: Job which suites we want.
832         :param build: Build which suites we want.
833         :type job: str
834         :type build: str
835         :returns: Suites.
836         :rtype: pandas.Series
837         """
838
839         return self.data[job][str(build)]["suites"]
840
841     def tests(self, job, build):
842         """Getter - tests
843
844         :param job: Job which tests we want.
845         :param build: Build which tests we want.
846         :type job: str
847         :type build: str
848         :returns: Tests.
849         :rtype: pandas.Series
850         """
851
852         return self.data[job][build]["tests"]
853
854     @staticmethod
855     def _parse_tests(job, build, log):
856         """Process data from robot output.xml file and return JSON structured
857         data.
858
859         :param job: The name of job which build output data will be processed.
860         :param build: The build which output data will be processed.
861         :param log: List of log messages.
862         :type job: str
863         :type build: dict
864         :type log: list of tuples (severity, msg)
865         :returns: JSON data structure.
866         :rtype: dict
867         """
868
869         metadata = {
870             "job": job,
871             "build": build
872         }
873
874         with open(build["file-name"], 'r') as data_file:
875             try:
876                 result = ExecutionResult(data_file)
877             except errors.DataError as err:
878                 log.append(("ERROR", "Error occurred while parsing output.xml: "
879                                      "{0}".format(err)))
880                 return None
881         checker = ExecutionChecker(metadata)
882         result.visit(checker)
883
884         return checker.data
885
886     def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
887         """Download and parse the input data file.
888
889         :param pid: PID of the process executing this method.
890         :param data_queue: Shared memory between processes. Queue which keeps
891             the result data. This data is then read by the main process and used
892             in further processing.
893         :param job: Name of the Jenkins job which generated the processed input
894             file.
895         :param build: Information about the Jenkins build which generated the
896             processed input file.
897         :param repeat: Repeat the download specified number of times if not
898             successful.
899         :type pid: int
900         :type data_queue: multiprocessing.Manager().Queue()
901         :type job: str
902         :type build: dict
903         :type repeat: int
904         """
905
906         logs = list()
907
908         logging.info("  Processing the job/build: {0}: {1}".
909                      format(job, build["build"]))
910
911         logs.append(("INFO", "  Processing the job/build: {0}: {1}".
912                      format(job, build["build"])))
913
914         state = "failed"
915         success = False
916         data = None
917         do_repeat = repeat
918         while do_repeat:
919             success = download_and_unzip_data_file(self._cfg, job, build, pid,
920                                                    logs)
921             if success:
922                 break
923             do_repeat -= 1
924         if not success:
925             logs.append(("ERROR", "It is not possible to download the input "
926                                   "data file from the job '{job}', build "
927                                   "'{build}', or it is damaged. Skipped.".
928                          format(job=job, build=build["build"])))
929         if success:
930             logs.append(("INFO", "  Processing data from the build '{0}' ...".
931                          format(build["build"])))
932             data = InputData._parse_tests(job, build, logs)
933             if data is None:
934                 logs.append(("ERROR", "Input data file from the job '{job}', "
935                                       "build '{build}' is damaged. Skipped.".
936                              format(job=job, build=build["build"])))
937             else:
938                 state = "processed"
939
940             try:
941                 remove(build["file-name"])
942             except OSError as err:
943                 logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
944                              format(build["file-name"], err)))
945         logs.append(("INFO", "  Done."))
946
947         result = {
948             "data": data,
949             "state": state,
950             "job": job,
951             "build": build,
952             "logs": logs
953         }
954         data_queue.put(result)
955
956     def download_and_parse_data(self, repeat=1):
957         """Download the input data files, parse input data from input files and
958         store in pandas' Series.
959
960         :param repeat: Repeat the download specified number of times if not
961             successful.
962         :type repeat: int
963         """
964
965         logging.info("Downloading and parsing input files ...")
966
967         work_queue = multiprocessing.JoinableQueue()
968         manager = multiprocessing.Manager()
969         data_queue = manager.Queue()
970         cpus = multiprocessing.cpu_count()
971
972         workers = list()
973         for cpu in range(cpus):
974             worker = Worker(work_queue,
975                             data_queue,
976                             self._download_and_parse_build)
977             worker.daemon = True
978             worker.start()
979             workers.append(worker)
980             os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
981                       format(cpu, worker.pid))
982
983         for job, builds in self._cfg.builds.items():
984             for build in builds:
985                 work_queue.put((job, build, repeat))
986
987         work_queue.join()
988
989         logging.info("Done.")
990
991         while not data_queue.empty():
992             result = data_queue.get()
993
994             job = result["job"]
995             build_nr = result["build"]["build"]
996
997             if result["data"]:
998                 data = result["data"]
999                 build_data = pd.Series({
1000                     "metadata": pd.Series(data["metadata"].values(),
1001                                           index=data["metadata"].keys()),
1002                     "suites": pd.Series(data["suites"].values(),
1003                                         index=data["suites"].keys()),
1004                     "tests": pd.Series(data["tests"].values(),
1005                                        index=data["tests"].keys())})
1006
1007                 if self._input_data.get(job, None) is None:
1008                     self._input_data[job] = pd.Series()
1009                 self._input_data[job][str(build_nr)] = build_data
1010
1011                 self._cfg.set_input_file_name(job, build_nr,
1012                                               result["build"]["file-name"])
1013
1014             self._cfg.set_input_state(job, build_nr, result["state"])
1015
1016             for item in result["logs"]:
1017                 if item[0] == "INFO":
1018                     logging.info(item[1])
1019                 elif item[0] == "ERROR":
1020                     logging.error(item[1])
1021                 elif item[0] == "DEBUG":
1022                     logging.debug(item[1])
1023                 elif item[0] == "CRITICAL":
1024                     logging.critical(item[1])
1025                 elif item[0] == "WARNING":
1026                     logging.warning(item[1])
1027
1028         del data_queue
1029
1030         # Terminate all workers
1031         for worker in workers:
1032             worker.terminate()
1033             worker.join()
1034
1035         logging.info("Done.")
1036
1037     @staticmethod
1038     def _end_of_tag(tag_filter, start=0, closer="'"):
1039         """Return the index of character in the string which is the end of tag.
1040
1041         :param tag_filter: The string where the end of tag is being searched.
1042         :param start: The index where the searching is stated.
1043         :param closer: The character which is the tag closer.
1044         :type tag_filter: str
1045         :type start: int
1046         :type closer: str
1047         :returns: The index of the tag closer.
1048         :rtype: int
1049         """
1050
1051         try:
1052             idx_opener = tag_filter.index(closer, start)
1053             return tag_filter.index(closer, idx_opener + 1)
1054         except ValueError:
1055             return None
1056
1057     @staticmethod
1058     def _condition(tag_filter):
1059         """Create a conditional statement from the given tag filter.
1060
1061         :param tag_filter: Filter based on tags from the element specification.
1062         :type tag_filter: str
1063         :returns: Conditional statement which can be evaluated.
1064         :rtype: str
1065         """
1066
1067         index = 0
1068         while True:
1069             index = InputData._end_of_tag(tag_filter, index)
1070             if index is None:
1071                 return tag_filter
1072             index += 1
1073             tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
1074
1075     def filter_data(self, element, params=None, data_set="tests",
1076                     continue_on_error=False):
1077         """Filter required data from the given jobs and builds.
1078
1079         The output data structure is:
1080
1081         - job 1
1082           - build 1
1083             - test (or suite) 1 ID:
1084               - param 1
1085               - param 2
1086               ...
1087               - param n
1088             ...
1089             - test (or suite) n ID:
1090             ...
1091           ...
1092           - build n
1093         ...
1094         - job n
1095
1096         :param element: Element which will use the filtered data.
1097         :param params: Parameters which will be included in the output. If None,
1098         all parameters are included.
1099         :param data_set: The set of data to be filtered: tests, suites,
1100         metadata.
1101         :param continue_on_error: Continue if there is error while reading the
1102         data. The Item will be empty then
1103         :type element: pandas.Series
1104         :type params: list
1105         :type data_set: str
1106         :type continue_on_error: bool
1107         :returns: Filtered data.
1108         :rtype pandas.Series
1109         """
1110
1111         try:
1112             if element["filter"] in ("all", "template"):
1113                 cond = "True"
1114             else:
1115                 cond = InputData._condition(element["filter"])
1116             logging.debug("   Filter: {0}".format(cond))
1117         except KeyError:
1118             logging.error("  No filter defined.")
1119             return None
1120
1121         if params is None:
1122             params = element.get("parameters", None)
1123
1124         data = pd.Series()
1125         try:
1126             for job, builds in element["data"].items():
1127                 data[job] = pd.Series()
1128                 for build in builds:
1129                     data[job][str(build)] = pd.Series()
1130                     try:
1131                         data_iter = self.data[job][str(build)][data_set].\
1132                             iteritems()
1133                     except KeyError:
1134                         if continue_on_error:
1135                             continue
1136                         else:
1137                             return None
1138                     for test_ID, test_data in data_iter:
1139                         if eval(cond, {"tags": test_data.get("tags", "")}):
1140                             data[job][str(build)][test_ID] = pd.Series()
1141                             if params is None:
1142                                 for param, val in test_data.items():
1143                                     data[job][str(build)][test_ID][param] = val
1144                             else:
1145                                 for param in params:
1146                                     try:
1147                                         data[job][str(build)][test_ID][param] =\
1148                                             test_data[param]
1149                                     except KeyError:
1150                                         data[job][str(build)][test_ID][param] =\
1151                                             "No Data"
1152             return data
1153
1154         except (KeyError, IndexError, ValueError) as err:
1155             logging.error("   Missing mandatory parameter in the element "
1156                           "specification: {0}".format(err))
1157             return None
1158         except AttributeError:
1159             return None
1160         except SyntaxError:
1161             logging.error("   The filter '{0}' is not correct. Check if all "
1162                           "tags are enclosed by apostrophes.".format(cond))
1163             return None
1164
1165     @staticmethod
1166     def merge_data(data):
1167         """Merge data from more jobs and builds to a simple data structure.
1168
1169         The output data structure is:
1170
1171         - test (suite) 1 ID:
1172           - param 1
1173           - param 2
1174           ...
1175           - param n
1176         ...
1177         - test (suite) n ID:
1178         ...
1179
1180         :param data: Data to merge.
1181         :type data: pandas.Series
1182         :returns: Merged data.
1183         :rtype: pandas.Series
1184         """
1185
1186         logging.info("    Merging data ...")
1187
1188         merged_data = pd.Series()
1189         for _, builds in data.iteritems():
1190             for _, item in builds.iteritems():
1191                 for ID, item_data in item.iteritems():
1192                     merged_data[ID] = item_data
1193
1194         return merged_data