52dc5823e53c4dffcaa66a80fc9ea86950d1921c
[csit.git] / resources / tools / presentation_new / input_data_parser.py
1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Data pre-processing
15
16 - extract data from output.xml files generated by Jenkins jobs and store in
17   pandas' Series,
18 - provide access to the data.
19 - filter the data using tags,
20 """
21
22 import multiprocessing
23 import os
24 import re
25 import pandas as pd
26 import logging
27
28 from robot.api import ExecutionResult, ResultVisitor
29 from robot import errors
30 from collections import OrderedDict
31 from string import replace
32 from os import remove
33 from os.path import join
34 from datetime import datetime as dt
35 from datetime import timedelta
36 from json import loads
37 from jumpavg.AvgStdevMetadataFactory import AvgStdevMetadataFactory
38
39 from input_data_files import download_and_unzip_data_file
40 from utils import Worker
41
42
43 # Separator used in file names
44 SEPARATOR = "__"
45
46
47 class ExecutionChecker(ResultVisitor):
48     """Class to traverse through the test suite structure.
49
50     The functionality implemented in this class generates a json structure:
51
52     Performance tests:
53
54     {
55         "metadata": {
56             "generated": "Timestamp",
57             "version": "SUT version",
58             "job": "Jenkins job name",
59             "build": "Information about the build"
60         },
61         "suites": {
62             "Suite long name 1": {
63                 "name": Suite name,
64                 "doc": "Suite 1 documentation",
65                 "parent": "Suite 1 parent",
66                 "level": "Level of the suite in the suite hierarchy"
67             }
68             "Suite long name N": {
69                 "name": Suite name,
70                 "doc": "Suite N documentation",
71                 "parent": "Suite 2 parent",
72                 "level": "Level of the suite in the suite hierarchy"
73             }
74         }
75         "tests": {
76             # NDRPDR tests:
77             "ID": {
78                 "name": "Test name",
79                 "parent": "Name of the parent of the test",
80                 "doc": "Test documentation",
81                 "msg": "Test message",
82                 "vat-history": "DUT1 and DUT2 VAT History",
83                 "show-run": "Show Run",
84                 "tags": ["tag 1", "tag 2", "tag n"],
85                 "type": "NDRPDR",
86                 "status": "PASS" | "FAIL",
87                 "throughput": {
88                     "NDR": {
89                         "LOWER": float,
90                         "UPPER": float
91                     },
92                     "PDR": {
93                         "LOWER": float,
94                         "UPPER": float
95                     }
96                 },
97                 "latency": {
98                     "NDR": {
99                         "direction1": {
100                             "min": float,
101                             "avg": float,
102                             "max": float
103                         },
104                         "direction2": {
105                             "min": float,
106                             "avg": float,
107                             "max": float
108                         }
109                     },
110                     "PDR": {
111                         "direction1": {
112                             "min": float,
113                             "avg": float,
114                             "max": float
115                         },
116                         "direction2": {
117                             "min": float,
118                             "avg": float,
119                             "max": float
120                         }
121                     }
122                 }
123             }
124
125             # TCP tests:
126             "ID": {
127                 "name": "Test name",
128                 "parent": "Name of the parent of the test",
129                 "doc": "Test documentation",
130                 "msg": "Test message",
131                 "tags": ["tag 1", "tag 2", "tag n"],
132                 "type": "TCP",
133                 "status": "PASS" | "FAIL",
134                 "result": int
135             }
136
137             # MRR, BMRR tests:
138             "ID": {
139                 "name": "Test name",
140                 "parent": "Name of the parent of the test",
141                 "doc": "Test documentation",
142                 "msg": "Test message",
143                 "tags": ["tag 1", "tag 2", "tag n"],
144                 "type": "MRR" | "BMRR",
145                 "status": "PASS" | "FAIL",
146                 "result": {
147                     "receive-rate": AvgStdevMetadata,
148                 }
149             }
150
151             # TODO: Remove when definitely no NDRPDRDISC tests are used:
152             # NDRPDRDISC tests:
153             "ID": {
154                 "name": "Test name",
155                 "parent": "Name of the parent of the test",
156                 "doc": "Test documentation",
157                 "msg": "Test message",
158                 "tags": ["tag 1", "tag 2", "tag n"],
159                 "type": "PDR" | "NDR",
160                 "status": "PASS" | "FAIL",
161                 "throughput": {  # Only type: "PDR" | "NDR"
162                     "value": int,
163                     "unit": "pps" | "bps" | "percentage"
164                 },
165                 "latency": {  # Only type: "PDR" | "NDR"
166                     "direction1": {
167                         "100": {
168                             "min": int,
169                             "avg": int,
170                             "max": int
171                         },
172                         "50": {  # Only for NDR
173                             "min": int,
174                             "avg": int,
175                             "max": int
176                         },
177                         "10": {  # Only for NDR
178                             "min": int,
179                             "avg": int,
180                             "max": int
181                         }
182                     },
183                     "direction2": {
184                         "100": {
185                             "min": int,
186                             "avg": int,
187                             "max": int
188                         },
189                         "50": {  # Only for NDR
190                             "min": int,
191                             "avg": int,
192                             "max": int
193                         },
194                         "10": {  # Only for NDR
195                             "min": int,
196                             "avg": int,
197                             "max": int
198                         }
199                     }
200                 },
201                 "lossTolerance": "lossTolerance",  # Only type: "PDR"
202                 "vat-history": "DUT1 and DUT2 VAT History"
203                 "show-run": "Show Run"
204             },
205             "ID" {
206                 # next test
207             }
208         }
209     }
210
211
212     Functional tests:
213
214     {
215         "metadata": {  # Optional
216             "version": "VPP version",
217             "job": "Jenkins job name",
218             "build": "Information about the build"
219         },
220         "suites": {
221             "Suite name 1": {
222                 "doc": "Suite 1 documentation",
223                 "parent": "Suite 1 parent",
224                 "level": "Level of the suite in the suite hierarchy"
225             }
226             "Suite name N": {
227                 "doc": "Suite N documentation",
228                 "parent": "Suite 2 parent",
229                 "level": "Level of the suite in the suite hierarchy"
230             }
231         }
232         "tests": {
233             "ID": {
234                 "name": "Test name",
235                 "parent": "Name of the parent of the test",
236                 "doc": "Test documentation"
237                 "msg": "Test message"
238                 "tags": ["tag 1", "tag 2", "tag n"],
239                 "vat-history": "DUT1 and DUT2 VAT History"
240                 "show-run": "Show Run"
241                 "status": "PASS" | "FAIL"
242             },
243             "ID" {
244                 # next test
245             }
246         }
247     }
248
249     .. note:: ID is the lowercase full path to the test.
250     """
251
252     # TODO: Remove when definitely no NDRPDRDISC tests are used:
253     REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
254
255     REGEX_NDRPDR_RATE = re.compile(r'NDR_LOWER:\s(\d+.\d+).*\n.*\n'
256                                    r'NDR_UPPER:\s(\d+.\d+).*\n'
257                                    r'PDR_LOWER:\s(\d+.\d+).*\n.*\n'
258                                    r'PDR_UPPER:\s(\d+.\d+)')
259
260     # TODO: Remove when definitely no NDRPDRDISC tests are used:
261     REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
262                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
263                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
264                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
265                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
266                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
267                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
268
269     REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
270                                r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
271                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
272
273     REGEX_NDRPDR_LAT = re.compile(r'LATENCY.*\[\'(.*)\', \'(.*)\'\]\s\n.*\n.*\n'
274                                   r'LATENCY.*\[\'(.*)\', \'(.*)\'\]')
275
276     REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
277                                  r'[\D\d]*')
278
279     REGEX_VERSION_VPP = re.compile(r"(return STDOUT Version:\s*|"
280                                    r"VPP Version:\s*)(.*)")
281
282     REGEX_VERSION_DPDK = re.compile(r"(return STDOUT testpmd)([\d\D\n]*)"
283                                     r"(RTE Version: 'DPDK )(.*)(')")
284
285     REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
286
287     REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
288                            r'tx\s(\d*),\srx\s(\d*)')
289
290     REGEX_BMRR = re.compile(r'Maximum Receive Rate trial results'
291                             r' in packets per second: \[(.*)\]')
292
293     REGEX_TC_TAG = re.compile(r'\d+[tT]\d+[cC]')
294
295     REGEX_TC_NAME_OLD = re.compile(r'-\d+[tT]\d+[cC]-')
296
297     REGEX_TC_NAME_NEW = re.compile(r'-\d+[cC]-')
298
299     REGEX_TC_NUMBER = re.compile(r'tc[0-9]{2}-')
300
301     def __init__(self, metadata, mapping, ignore):
302         """Initialisation.
303
304         :param metadata: Key-value pairs to be included in "metadata" part of
305             JSON structure.
306         :param mapping: Mapping of the old names of test cases to the new
307             (actual) one.
308         :param ignore: List of TCs to be ignored.
309         :type metadata: dict
310         :type mapping: dict
311         :type ignore: list
312         """
313
314         # Type of message to parse out from the test messages
315         self._msg_type = None
316
317         # VPP version
318         self._version = None
319
320         # Timestamp
321         self._timestamp = None
322
323         # Testbed. The testbed is identified by TG node IP address.
324         self._testbed = None
325
326         # Mapping of TCs long names
327         self._mapping = mapping
328
329         # Ignore list
330         self._ignore = ignore
331
332         # Number of VAT History messages found:
333         # 0 - no message
334         # 1 - VAT History of DUT1
335         # 2 - VAT History of DUT2
336         self._lookup_kw_nr = 0
337         self._vat_history_lookup_nr = 0
338
339         # Number of Show Running messages found
340         # 0 - no message
341         # 1 - Show run message found
342         self._show_run_lookup_nr = 0
343
344         # Test ID of currently processed test- the lowercase full path to the
345         # test
346         self._test_ID = None
347
348         # The main data structure
349         self._data = {
350             "metadata": OrderedDict(),
351             "suites": OrderedDict(),
352             "tests": OrderedDict()
353         }
354
355         # Save the provided metadata
356         for key, val in metadata.items():
357             self._data["metadata"][key] = val
358
359         # Dictionary defining the methods used to parse different types of
360         # messages
361         self.parse_msg = {
362             "timestamp": self._get_timestamp,
363             "vpp-version": self._get_vpp_version,
364             "dpdk-version": self._get_dpdk_version,
365             "teardown-vat-history": self._get_vat_history,
366             "test-show-runtime": self._get_show_run,
367             "testbed": self._get_testbed
368         }
369
370     @property
371     def data(self):
372         """Getter - Data parsed from the XML file.
373
374         :returns: Data parsed from the XML file.
375         :rtype: dict
376         """
377         return self._data
378
379     def _get_testbed(self, msg):
380         """Called when extraction of testbed IP is required.
381         The testbed is identified by TG node IP address.
382
383         :param msg: Message to process.
384         :type msg: Message
385         :returns: Nothing.
386         """
387
388         if msg.message.count("Arguments:"):
389             message = str(msg.message).replace(' ', '').replace('\n', '').\
390                 replace("'", '"').replace('b"', '"').\
391                 replace("honeycom", "honeycomb")
392             message = loads(message[11:-1])
393             try:
394                 self._testbed = message["TG"]["host"]
395             except (KeyError, ValueError):
396                 pass
397             finally:
398                 self._data["metadata"]["testbed"] = self._testbed
399                 self._msg_type = None
400
401     def _get_vpp_version(self, msg):
402         """Called when extraction of VPP version is required.
403
404         :param msg: Message to process.
405         :type msg: Message
406         :returns: Nothing.
407         """
408
409         if msg.message.count("return STDOUT Version:") or \
410             msg.message.count("VPP Version:"):
411             self._version = str(re.search(self.REGEX_VERSION_VPP, msg.message).
412                                 group(2))
413             self._data["metadata"]["version"] = self._version
414             self._msg_type = None
415
416     def _get_dpdk_version(self, msg):
417         """Called when extraction of DPDK version is required.
418
419         :param msg: Message to process.
420         :type msg: Message
421         :returns: Nothing.
422         """
423
424         if msg.message.count("return STDOUT testpmd"):
425             try:
426                 self._version = str(re.search(
427                     self.REGEX_VERSION_DPDK, msg.message). group(4))
428                 self._data["metadata"]["version"] = self._version
429             except IndexError:
430                 pass
431             finally:
432                 self._msg_type = None
433
434     def _get_timestamp(self, msg):
435         """Called when extraction of timestamp is required.
436
437         :param msg: Message to process.
438         :type msg: Message
439         :returns: Nothing.
440         """
441
442         self._timestamp = msg.timestamp[:14]
443         self._data["metadata"]["generated"] = self._timestamp
444         self._msg_type = None
445
446     def _get_vat_history(self, msg):
447         """Called when extraction of VAT command history is required.
448
449         :param msg: Message to process.
450         :type msg: Message
451         :returns: Nothing.
452         """
453         if msg.message.count("VAT command history:"):
454             self._vat_history_lookup_nr += 1
455             if self._vat_history_lookup_nr == 1:
456                 self._data["tests"][self._test_ID]["vat-history"] = str()
457             else:
458                 self._msg_type = None
459             text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
460                           "VAT command history:", "", msg.message, count=1). \
461                 replace("\n\n", "\n").replace('\n', ' |br| ').\
462                 replace('\r', '').replace('"', "'")
463
464             self._data["tests"][self._test_ID]["vat-history"] += " |br| "
465             self._data["tests"][self._test_ID]["vat-history"] += \
466                 "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
467
468     def _get_show_run(self, msg):
469         """Called when extraction of VPP operational data (output of CLI command
470         Show Runtime) is required.
471
472         :param msg: Message to process.
473         :type msg: Message
474         :returns: Nothing.
475         """
476         if msg.message.count("return STDOUT Thread "):
477             self._show_run_lookup_nr += 1
478             if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
479                 self._data["tests"][self._test_ID]["show-run"] = str()
480             if self._lookup_kw_nr > 1:
481                 self._msg_type = None
482             if self._show_run_lookup_nr == 1:
483                 text = msg.message.replace("vat# ", "").\
484                     replace("return STDOUT ", "").replace("\n\n", "\n").\
485                     replace('\n', ' |br| ').\
486                     replace('\r', '').replace('"', "'")
487                 try:
488                     self._data["tests"][self._test_ID]["show-run"] += " |br| "
489                     self._data["tests"][self._test_ID]["show-run"] += \
490                         "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
491                 except KeyError:
492                     pass
493
494     # TODO: Remove when definitely no NDRPDRDISC tests are used:
495     def _get_latency(self, msg, test_type):
496         """Get the latency data from the test message.
497
498         :param msg: Message to be parsed.
499         :param test_type: Type of the test - NDR or PDR.
500         :type msg: str
501         :type test_type: str
502         :returns: Latencies parsed from the message.
503         :rtype: dict
504         """
505
506         if test_type == "NDR":
507             groups = re.search(self.REGEX_LAT_NDR, msg)
508             groups_range = range(1, 7)
509         elif test_type == "PDR":
510             groups = re.search(self.REGEX_LAT_PDR, msg)
511             groups_range = range(1, 3)
512         else:
513             return {}
514
515         latencies = list()
516         for idx in groups_range:
517             try:
518                 lat = [int(item) for item in str(groups.group(idx)).split('/')]
519             except (AttributeError, ValueError):
520                 lat = [-1, -1, -1]
521             latencies.append(lat)
522
523         keys = ("min", "avg", "max")
524         latency = {
525             "direction1": {
526             },
527             "direction2": {
528             }
529         }
530
531         latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
532         latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
533         if test_type == "NDR":
534             latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
535             latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
536             latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
537             latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
538
539         return latency
540
541     def _get_ndrpdr_throughput(self, msg):
542         """Get NDR_LOWER, NDR_UPPER, PDR_LOWER and PDR_UPPER from the test
543         message.
544
545         :param msg: The test message to be parsed.
546         :type msg: str
547         :returns: Parsed data as a dict and the status (PASS/FAIL).
548         :rtype: tuple(dict, str)
549         """
550
551         throughput = {
552             "NDR": {"LOWER": -1.0, "UPPER": -1.0},
553             "PDR": {"LOWER": -1.0, "UPPER": -1.0}
554         }
555         status = "FAIL"
556         groups = re.search(self.REGEX_NDRPDR_RATE, msg)
557
558         if groups is not None:
559             try:
560                 throughput["NDR"]["LOWER"] = float(groups.group(1))
561                 throughput["NDR"]["UPPER"] = float(groups.group(2))
562                 throughput["PDR"]["LOWER"] = float(groups.group(3))
563                 throughput["PDR"]["UPPER"] = float(groups.group(4))
564                 status = "PASS"
565             except (IndexError, ValueError):
566                 pass
567
568         return throughput, status
569
570     def _get_ndrpdr_latency(self, msg):
571         """Get LATENCY from the test message.
572
573         :param msg: The test message to be parsed.
574         :type msg: str
575         :returns: Parsed data as a dict and the status (PASS/FAIL).
576         :rtype: tuple(dict, str)
577         """
578
579         latency = {
580             "NDR": {
581                 "direction1": {"min": -1.0, "avg": -1.0, "max": -1.0},
582                 "direction2": {"min": -1.0, "avg": -1.0, "max": -1.0}
583             },
584             "PDR": {
585                 "direction1": {"min": -1.0, "avg": -1.0, "max": -1.0},
586                 "direction2": {"min": -1.0, "avg": -1.0, "max": -1.0}
587             }
588         }
589         status = "FAIL"
590         groups = re.search(self.REGEX_NDRPDR_LAT, msg)
591
592         if groups is not None:
593             keys = ("min", "avg", "max")
594             try:
595                 latency["NDR"]["direction1"] = dict(
596                     zip(keys, [float(l) for l in groups.group(1).split('/')]))
597                 latency["NDR"]["direction2"] = dict(
598                     zip(keys, [float(l) for l in groups.group(2).split('/')]))
599                 latency["PDR"]["direction1"] = dict(
600                     zip(keys, [float(l) for l in groups.group(3).split('/')]))
601                 latency["PDR"]["direction2"] = dict(
602                     zip(keys, [float(l) for l in groups.group(4).split('/')]))
603                 status = "PASS"
604             except (IndexError, ValueError):
605                 pass
606
607         return latency, status
608
609     def visit_suite(self, suite):
610         """Implements traversing through the suite and its direct children.
611
612         :param suite: Suite to process.
613         :type suite: Suite
614         :returns: Nothing.
615         """
616         if self.start_suite(suite) is not False:
617             suite.suites.visit(self)
618             suite.tests.visit(self)
619             self.end_suite(suite)
620
621     def start_suite(self, suite):
622         """Called when suite starts.
623
624         :param suite: Suite to process.
625         :type suite: Suite
626         :returns: Nothing.
627         """
628
629         try:
630             parent_name = suite.parent.name
631         except AttributeError:
632             return
633
634         doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
635             replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
636         doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
637
638         self._data["suites"][suite.longname.lower().replace('"', "'").
639             replace(" ", "_")] = {
640                 "name": suite.name.lower(),
641                 "doc": doc_str,
642                 "parent": parent_name,
643                 "level": len(suite.longname.split("."))
644             }
645
646         suite.keywords.visit(self)
647
648     def end_suite(self, suite):
649         """Called when suite ends.
650
651         :param suite: Suite to process.
652         :type suite: Suite
653         :returns: Nothing.
654         """
655         pass
656
657     def visit_test(self, test):
658         """Implements traversing through the test.
659
660         :param test: Test to process.
661         :type test: Test
662         :returns: Nothing.
663         """
664         if self.start_test(test) is not False:
665             test.keywords.visit(self)
666             self.end_test(test)
667
668     def start_test(self, test):
669         """Called when test starts.
670
671         :param test: Test to process.
672         :type test: Test
673         :returns: Nothing.
674         """
675
676         longname_orig = test.longname.lower()
677
678         # Check the ignore list
679         if longname_orig in self._ignore:
680             return
681
682         tags = [str(tag) for tag in test.tags]
683         test_result = dict()
684
685         # Change the TC long name and name if defined in the mapping table
686         longname = self._mapping.get(longname_orig, None)
687         if longname is not None:
688             name = longname.split('.')[-1]
689             logging.debug("{0}\n{1}\n{2}\n{3}".format(
690                 self._data["metadata"], longname_orig, longname, name))
691         else:
692             longname = longname_orig
693             name = test.name.lower()
694
695         # Remove TC number from the TC long name (backward compatibility):
696         self._test_ID = re.sub(self.REGEX_TC_NUMBER, "", longname)
697         # Remove TC number from the TC name (not needed):
698         test_result["name"] = re.sub(self.REGEX_TC_NUMBER, "", name)
699
700         test_result["parent"] = test.parent.name.lower()
701         test_result["tags"] = tags
702         doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
703             replace('\r', '').replace('[', ' |br| [')
704         test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
705         test_result["msg"] = test.message.replace('\n', ' |br| '). \
706             replace('\r', '').replace('"', "'")
707         test_result["type"] = "FUNC"
708         test_result["status"] = test.status
709
710         if "PERFTEST" in tags:
711             # Replace info about cores (e.g. -1c-) with the info about threads
712             # and cores (e.g. -1t1c-) in the long test case names and in the
713             # test case names if necessary.
714             groups = re.search(self.REGEX_TC_NAME_OLD, self._test_ID)
715             if not groups:
716                 tag_count = 0
717                 for tag in test_result["tags"]:
718                     groups = re.search(self.REGEX_TC_TAG, tag)
719                     if groups:
720                         tag_count += 1
721                         tag_tc = tag
722
723                 if tag_count == 1:
724                     self._test_ID = re.sub(self.REGEX_TC_NAME_NEW,
725                                            "-{0}-".format(tag_tc.lower()),
726                                            self._test_ID,
727                                            count=1)
728                     test_result["name"] = re.sub(self.REGEX_TC_NAME_NEW,
729                                                  "-{0}-".format(tag_tc.lower()),
730                                                  test_result["name"],
731                                                  count=1)
732                 else:
733                     test_result["status"] = "FAIL"
734                     self._data["tests"][self._test_ID] = test_result
735                     logging.debug("The test '{0}' has no or more than one "
736                                   "multi-threading tags.".format(self._test_ID))
737                     logging.debug("Tags: {0}".format(test_result["tags"]))
738                     return
739
740         if test.status == "PASS" and ("NDRPDRDISC" in tags or
741                                       "NDRPDR" in tags or
742                                       "TCP" in tags or
743                                       "MRR" in tags or
744                                       "BMRR" in tags):
745             # TODO: Remove when definitely no NDRPDRDISC tests are used:
746             if "NDRDISC" in tags:
747                 test_result["type"] = "NDR"
748             # TODO: Remove when definitely no NDRPDRDISC tests are used:
749             elif "PDRDISC" in tags:
750                 test_result["type"] = "PDR"
751             elif "NDRPDR" in tags:
752                 test_result["type"] = "NDRPDR"
753             elif "TCP" in tags:
754                 test_result["type"] = "TCP"
755             elif "MRR" in tags:
756                 test_result["type"] = "MRR"
757             elif "FRMOBL" in tags or "BMRR" in tags:
758                 test_result["type"] = "BMRR"
759             else:
760                 test_result["status"] = "FAIL"
761                 self._data["tests"][self._test_ID] = test_result
762                 return
763
764             # TODO: Remove when definitely no NDRPDRDISC tests are used:
765             if test_result["type"] in ("NDR", "PDR"):
766                 try:
767                     rate_value = str(re.search(
768                         self.REGEX_RATE, test.message).group(1))
769                 except AttributeError:
770                     rate_value = "-1"
771                 try:
772                     rate_unit = str(re.search(
773                         self.REGEX_RATE, test.message).group(2))
774                 except AttributeError:
775                     rate_unit = "-1"
776
777                 test_result["throughput"] = dict()
778                 test_result["throughput"]["value"] = \
779                     int(rate_value.split('.')[0])
780                 test_result["throughput"]["unit"] = rate_unit
781                 test_result["latency"] = \
782                     self._get_latency(test.message, test_result["type"])
783                 if test_result["type"] == "PDR":
784                     test_result["lossTolerance"] = str(re.search(
785                         self.REGEX_TOLERANCE, test.message).group(1))
786
787             elif test_result["type"] in ("NDRPDR", ):
788                 test_result["throughput"], test_result["status"] = \
789                     self._get_ndrpdr_throughput(test.message)
790                 test_result["latency"], test_result["status"] = \
791                     self._get_ndrpdr_latency(test.message)
792
793             elif test_result["type"] in ("TCP", ):
794                 groups = re.search(self.REGEX_TCP, test.message)
795                 test_result["result"] = int(groups.group(2))
796
797             elif test_result["type"] in ("MRR", "BMRR"):
798                 test_result["result"] = dict()
799                 groups = re.search(self.REGEX_BMRR, test.message)
800                 if groups is not None:
801                     items_str = groups.group(1)
802                     items_float = [float(item.strip()) for item
803                                    in items_str.split(",")]
804                     metadata = AvgStdevMetadataFactory.from_data(items_float)
805                     test_result["result"]["receive-rate"] = metadata
806                 else:
807                     groups = re.search(self.REGEX_MRR, test.message)
808                     test_result["result"]["receive-rate"] = \
809                         AvgStdevMetadataFactory.from_data([
810                             float(groups.group(3)) / float(groups.group(1)), ])
811
812         self._data["tests"][self._test_ID] = test_result
813
814     def end_test(self, test):
815         """Called when test ends.
816
817         :param test: Test to process.
818         :type test: Test
819         :returns: Nothing.
820         """
821         pass
822
823     def visit_keyword(self, keyword):
824         """Implements traversing through the keyword and its child keywords.
825
826         :param keyword: Keyword to process.
827         :type keyword: Keyword
828         :returns: Nothing.
829         """
830         if self.start_keyword(keyword) is not False:
831             self.end_keyword(keyword)
832
833     def start_keyword(self, keyword):
834         """Called when keyword starts. Default implementation does nothing.
835
836         :param keyword: Keyword to process.
837         :type keyword: Keyword
838         :returns: Nothing.
839         """
840         try:
841             if keyword.type == "setup":
842                 self.visit_setup_kw(keyword)
843             elif keyword.type == "teardown":
844                 self._lookup_kw_nr = 0
845                 self.visit_teardown_kw(keyword)
846             else:
847                 self._lookup_kw_nr = 0
848                 self.visit_test_kw(keyword)
849         except AttributeError:
850             pass
851
852     def end_keyword(self, keyword):
853         """Called when keyword ends. Default implementation does nothing.
854
855         :param keyword: Keyword to process.
856         :type keyword: Keyword
857         :returns: Nothing.
858         """
859         pass
860
861     def visit_test_kw(self, test_kw):
862         """Implements traversing through the test keyword and its child
863         keywords.
864
865         :param test_kw: Keyword to process.
866         :type test_kw: Keyword
867         :returns: Nothing.
868         """
869         for keyword in test_kw.keywords:
870             if self.start_test_kw(keyword) is not False:
871                 self.visit_test_kw(keyword)
872                 self.end_test_kw(keyword)
873
874     def start_test_kw(self, test_kw):
875         """Called when test keyword starts. Default implementation does
876         nothing.
877
878         :param test_kw: Keyword to process.
879         :type test_kw: Keyword
880         :returns: Nothing.
881         """
882         if test_kw.name.count("Show Runtime Counters On All Duts"):
883             self._lookup_kw_nr += 1
884             self._show_run_lookup_nr = 0
885             self._msg_type = "test-show-runtime"
886         elif test_kw.name.count("Start The L2fwd Test") and not self._version:
887             self._msg_type = "dpdk-version"
888         else:
889             return
890         test_kw.messages.visit(self)
891
892     def end_test_kw(self, test_kw):
893         """Called when keyword ends. Default implementation does nothing.
894
895         :param test_kw: Keyword to process.
896         :type test_kw: Keyword
897         :returns: Nothing.
898         """
899         pass
900
901     def visit_setup_kw(self, setup_kw):
902         """Implements traversing through the teardown keyword and its child
903         keywords.
904
905         :param setup_kw: Keyword to process.
906         :type setup_kw: Keyword
907         :returns: Nothing.
908         """
909         for keyword in setup_kw.keywords:
910             if self.start_setup_kw(keyword) is not False:
911                 self.visit_setup_kw(keyword)
912                 self.end_setup_kw(keyword)
913
914     def start_setup_kw(self, setup_kw):
915         """Called when teardown keyword starts. Default implementation does
916         nothing.
917
918         :param setup_kw: Keyword to process.
919         :type setup_kw: Keyword
920         :returns: Nothing.
921         """
922         if setup_kw.name.count("Show Vpp Version On All Duts") \
923                 and not self._version:
924             self._msg_type = "vpp-version"
925
926         elif setup_kw.name.count("Setup performance global Variables") \
927                 and not self._timestamp:
928             self._msg_type = "timestamp"
929         elif setup_kw.name.count("Setup Framework") and not self._testbed:
930             self._msg_type = "testbed"
931         else:
932             return
933         setup_kw.messages.visit(self)
934
935     def end_setup_kw(self, setup_kw):
936         """Called when keyword ends. Default implementation does nothing.
937
938         :param setup_kw: Keyword to process.
939         :type setup_kw: Keyword
940         :returns: Nothing.
941         """
942         pass
943
944     def visit_teardown_kw(self, teardown_kw):
945         """Implements traversing through the teardown keyword and its child
946         keywords.
947
948         :param teardown_kw: Keyword to process.
949         :type teardown_kw: Keyword
950         :returns: Nothing.
951         """
952         for keyword in teardown_kw.keywords:
953             if self.start_teardown_kw(keyword) is not False:
954                 self.visit_teardown_kw(keyword)
955                 self.end_teardown_kw(keyword)
956
957     def start_teardown_kw(self, teardown_kw):
958         """Called when teardown keyword starts. Default implementation does
959         nothing.
960
961         :param teardown_kw: Keyword to process.
962         :type teardown_kw: Keyword
963         :returns: Nothing.
964         """
965
966         if teardown_kw.name.count("Show Vat History On All Duts"):
967             self._vat_history_lookup_nr = 0
968             self._msg_type = "teardown-vat-history"
969             teardown_kw.messages.visit(self)
970
971     def end_teardown_kw(self, teardown_kw):
972         """Called when keyword ends. Default implementation does nothing.
973
974         :param teardown_kw: Keyword to process.
975         :type teardown_kw: Keyword
976         :returns: Nothing.
977         """
978         pass
979
980     def visit_message(self, msg):
981         """Implements visiting the message.
982
983         :param msg: Message to process.
984         :type msg: Message
985         :returns: Nothing.
986         """
987         if self.start_message(msg) is not False:
988             self.end_message(msg)
989
990     def start_message(self, msg):
991         """Called when message starts. Get required information from messages:
992         - VPP version.
993
994         :param msg: Message to process.
995         :type msg: Message
996         :returns: Nothing.
997         """
998
999         if self._msg_type:
1000             self.parse_msg[self._msg_type](msg)
1001
1002     def end_message(self, msg):
1003         """Called when message ends. Default implementation does nothing.
1004
1005         :param msg: Message to process.
1006         :type msg: Message
1007         :returns: Nothing.
1008         """
1009         pass
1010
1011
1012 class InputData(object):
1013     """Input data
1014
1015     The data is extracted from output.xml files generated by Jenkins jobs and
1016     stored in pandas' DataFrames.
1017
1018     The data structure:
1019     - job name
1020       - build number
1021         - metadata
1022           (as described in ExecutionChecker documentation)
1023         - suites
1024           (as described in ExecutionChecker documentation)
1025         - tests
1026           (as described in ExecutionChecker documentation)
1027     """
1028
1029     def __init__(self, spec):
1030         """Initialization.
1031
1032         :param spec: Specification.
1033         :type spec: Specification
1034         """
1035
1036         # Specification:
1037         self._cfg = spec
1038
1039         # Data store:
1040         self._input_data = pd.Series()
1041
1042     @property
1043     def data(self):
1044         """Getter - Input data.
1045
1046         :returns: Input data
1047         :rtype: pandas.Series
1048         """
1049         return self._input_data
1050
1051     def metadata(self, job, build):
1052         """Getter - metadata
1053
1054         :param job: Job which metadata we want.
1055         :param build: Build which metadata we want.
1056         :type job: str
1057         :type build: str
1058         :returns: Metadata
1059         :rtype: pandas.Series
1060         """
1061
1062         return self.data[job][build]["metadata"]
1063
1064     def suites(self, job, build):
1065         """Getter - suites
1066
1067         :param job: Job which suites we want.
1068         :param build: Build which suites we want.
1069         :type job: str
1070         :type build: str
1071         :returns: Suites.
1072         :rtype: pandas.Series
1073         """
1074
1075         return self.data[job][str(build)]["suites"]
1076
1077     def tests(self, job, build):
1078         """Getter - tests
1079
1080         :param job: Job which tests we want.
1081         :param build: Build which tests we want.
1082         :type job: str
1083         :type build: str
1084         :returns: Tests.
1085         :rtype: pandas.Series
1086         """
1087
1088         return self.data[job][build]["tests"]
1089
1090     def _parse_tests(self, job, build, log):
1091         """Process data from robot output.xml file and return JSON structured
1092         data.
1093
1094         :param job: The name of job which build output data will be processed.
1095         :param build: The build which output data will be processed.
1096         :param log: List of log messages.
1097         :type job: str
1098         :type build: dict
1099         :type log: list of tuples (severity, msg)
1100         :returns: JSON data structure.
1101         :rtype: dict
1102         """
1103
1104         metadata = {
1105             "job": job,
1106             "build": build
1107         }
1108
1109         with open(build["file-name"], 'r') as data_file:
1110             try:
1111                 result = ExecutionResult(data_file)
1112             except errors.DataError as err:
1113                 log.append(("ERROR", "Error occurred while parsing output.xml: "
1114                                      "{0}".format(err)))
1115                 return None
1116         checker = ExecutionChecker(metadata, self._cfg.mapping,
1117                                    self._cfg.ignore)
1118         result.visit(checker)
1119
1120         return checker.data
1121
1122     def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
1123         """Download and parse the input data file.
1124
1125         :param pid: PID of the process executing this method.
1126         :param data_queue: Shared memory between processes. Queue which keeps
1127             the result data. This data is then read by the main process and used
1128             in further processing.
1129         :param job: Name of the Jenkins job which generated the processed input
1130             file.
1131         :param build: Information about the Jenkins build which generated the
1132             processed input file.
1133         :param repeat: Repeat the download specified number of times if not
1134             successful.
1135         :type pid: int
1136         :type data_queue: multiprocessing.Manager().Queue()
1137         :type job: str
1138         :type build: dict
1139         :type repeat: int
1140         """
1141
1142         logs = list()
1143
1144         logging.info("  Processing the job/build: {0}: {1}".
1145                      format(job, build["build"]))
1146
1147         logs.append(("INFO", "  Processing the job/build: {0}: {1}".
1148                      format(job, build["build"])))
1149
1150         state = "failed"
1151         success = False
1152         data = None
1153         do_repeat = repeat
1154         while do_repeat:
1155             success = download_and_unzip_data_file(self._cfg, job, build, pid,
1156                                                    logs)
1157             if success:
1158                 break
1159             do_repeat -= 1
1160         if not success:
1161             logs.append(("ERROR", "It is not possible to download the input "
1162                                   "data file from the job '{job}', build "
1163                                   "'{build}', or it is damaged. Skipped.".
1164                          format(job=job, build=build["build"])))
1165         if success:
1166             logs.append(("INFO", "  Processing data from the build '{0}' ...".
1167                          format(build["build"])))
1168             data = self._parse_tests(job, build, logs)
1169             if data is None:
1170                 logs.append(("ERROR", "Input data file from the job '{job}', "
1171                                       "build '{build}' is damaged. Skipped.".
1172                              format(job=job, build=build["build"])))
1173             else:
1174                 state = "processed"
1175
1176             try:
1177                 remove(build["file-name"])
1178             except OSError as err:
1179                 logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
1180                              format(build["file-name"], repr(err))))
1181
1182         # If the time-period is defined in the specification file, remove all
1183         # files which are outside the time period.
1184         timeperiod = self._cfg.input.get("time-period", None)
1185         if timeperiod and data:
1186             now = dt.utcnow()
1187             timeperiod = timedelta(int(timeperiod))
1188             metadata = data.get("metadata", None)
1189             if metadata:
1190                 generated = metadata.get("generated", None)
1191                 if generated:
1192                     generated = dt.strptime(generated, "%Y%m%d %H:%M")
1193                     if (now - generated) > timeperiod:
1194                         # Remove the data and the file:
1195                         state = "removed"
1196                         data = None
1197                         logs.append(
1198                             ("INFO",
1199                              "    The build {job}/{build} is outdated, will be "
1200                              "removed".format(job=job, build=build["build"])))
1201                         file_name = self._cfg.input["file-name"]
1202                         full_name = join(
1203                             self._cfg.environment["paths"]["DIR[WORKING,DATA]"],
1204                             "{job}{sep}{build}{sep}{name}".
1205                                 format(job=job,
1206                                        sep=SEPARATOR,
1207                                        build=build["build"],
1208                                        name=file_name))
1209                         try:
1210                             remove(full_name)
1211                             logs.append(("INFO",
1212                                          "    The file {name} has been removed".
1213                                          format(name=full_name)))
1214                         except OSError as err:
1215                             logs.append(("ERROR",
1216                                         "Cannot remove the file '{0}': {1}".
1217                                         format(full_name, repr(err))))
1218
1219         logs.append(("INFO", "  Done."))
1220
1221         result = {
1222             "data": data,
1223             "state": state,
1224             "job": job,
1225             "build": build,
1226             "logs": logs
1227         }
1228         data_queue.put(result)
1229
1230     def download_and_parse_data(self, repeat=1):
1231         """Download the input data files, parse input data from input files and
1232         store in pandas' Series.
1233
1234         :param repeat: Repeat the download specified number of times if not
1235             successful.
1236         :type repeat: int
1237         """
1238
1239         logging.info("Downloading and parsing input files ...")
1240
1241         work_queue = multiprocessing.JoinableQueue()
1242         manager = multiprocessing.Manager()
1243         data_queue = manager.Queue()
1244         cpus = multiprocessing.cpu_count()
1245
1246         workers = list()
1247         for cpu in range(cpus):
1248             worker = Worker(work_queue,
1249                             data_queue,
1250                             self._download_and_parse_build)
1251             worker.daemon = True
1252             worker.start()
1253             workers.append(worker)
1254             os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
1255                       format(cpu, worker.pid))
1256
1257         for job, builds in self._cfg.builds.items():
1258             for build in builds:
1259                 work_queue.put((job, build, repeat))
1260
1261         work_queue.join()
1262
1263         logging.info("Done.")
1264
1265         while not data_queue.empty():
1266             result = data_queue.get()
1267
1268             job = result["job"]
1269             build_nr = result["build"]["build"]
1270
1271             if result["data"]:
1272                 data = result["data"]
1273                 build_data = pd.Series({
1274                     "metadata": pd.Series(data["metadata"].values(),
1275                                           index=data["metadata"].keys()),
1276                     "suites": pd.Series(data["suites"].values(),
1277                                         index=data["suites"].keys()),
1278                     "tests": pd.Series(data["tests"].values(),
1279                                        index=data["tests"].keys())})
1280
1281                 if self._input_data.get(job, None) is None:
1282                     self._input_data[job] = pd.Series()
1283                 self._input_data[job][str(build_nr)] = build_data
1284
1285                 self._cfg.set_input_file_name(job, build_nr,
1286                                               result["build"]["file-name"])
1287
1288             self._cfg.set_input_state(job, build_nr, result["state"])
1289
1290             for item in result["logs"]:
1291                 if item[0] == "INFO":
1292                     logging.info(item[1])
1293                 elif item[0] == "ERROR":
1294                     logging.error(item[1])
1295                 elif item[0] == "DEBUG":
1296                     logging.debug(item[1])
1297                 elif item[0] == "CRITICAL":
1298                     logging.critical(item[1])
1299                 elif item[0] == "WARNING":
1300                     logging.warning(item[1])
1301
1302         del data_queue
1303
1304         # Terminate all workers
1305         for worker in workers:
1306             worker.terminate()
1307             worker.join()
1308
1309         logging.info("Done.")
1310
1311     @staticmethod
1312     def _end_of_tag(tag_filter, start=0, closer="'"):
1313         """Return the index of character in the string which is the end of tag.
1314
1315         :param tag_filter: The string where the end of tag is being searched.
1316         :param start: The index where the searching is stated.
1317         :param closer: The character which is the tag closer.
1318         :type tag_filter: str
1319         :type start: int
1320         :type closer: str
1321         :returns: The index of the tag closer.
1322         :rtype: int
1323         """
1324
1325         try:
1326             idx_opener = tag_filter.index(closer, start)
1327             return tag_filter.index(closer, idx_opener + 1)
1328         except ValueError:
1329             return None
1330
1331     @staticmethod
1332     def _condition(tag_filter):
1333         """Create a conditional statement from the given tag filter.
1334
1335         :param tag_filter: Filter based on tags from the element specification.
1336         :type tag_filter: str
1337         :returns: Conditional statement which can be evaluated.
1338         :rtype: str
1339         """
1340
1341         index = 0
1342         while True:
1343             index = InputData._end_of_tag(tag_filter, index)
1344             if index is None:
1345                 return tag_filter
1346             index += 1
1347             tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
1348
1349     def filter_data(self, element, params=None, data_set="tests",
1350                     continue_on_error=False):
1351         """Filter required data from the given jobs and builds.
1352
1353         The output data structure is:
1354
1355         - job 1
1356           - build 1
1357             - test (or suite) 1 ID:
1358               - param 1
1359               - param 2
1360               ...
1361               - param n
1362             ...
1363             - test (or suite) n ID:
1364             ...
1365           ...
1366           - build n
1367         ...
1368         - job n
1369
1370         :param element: Element which will use the filtered data.
1371         :param params: Parameters which will be included in the output. If None,
1372         all parameters are included.
1373         :param data_set: The set of data to be filtered: tests, suites,
1374         metadata.
1375         :param continue_on_error: Continue if there is error while reading the
1376         data. The Item will be empty then
1377         :type element: pandas.Series
1378         :type params: list
1379         :type data_set: str
1380         :type continue_on_error: bool
1381         :returns: Filtered data.
1382         :rtype pandas.Series
1383         """
1384
1385         try:
1386             if element["filter"] in ("all", "template"):
1387                 cond = "True"
1388             else:
1389                 cond = InputData._condition(element["filter"])
1390             logging.debug("   Filter: {0}".format(cond))
1391         except KeyError:
1392             logging.error("  No filter defined.")
1393             return None
1394
1395         if params is None:
1396             params = element.get("parameters", None)
1397             if params:
1398                 params.append("type")
1399
1400         data = pd.Series()
1401         try:
1402             for job, builds in element["data"].items():
1403                 data[job] = pd.Series()
1404                 for build in builds:
1405                     data[job][str(build)] = pd.Series()
1406                     try:
1407                         data_iter = self.data[job][str(build)][data_set].\
1408                             iteritems()
1409                     except KeyError:
1410                         if continue_on_error:
1411                             continue
1412                         else:
1413                             return None
1414                     for test_ID, test_data in data_iter:
1415                         if eval(cond, {"tags": test_data.get("tags", "")}):
1416                             data[job][str(build)][test_ID] = pd.Series()
1417                             if params is None:
1418                                 for param, val in test_data.items():
1419                                     data[job][str(build)][test_ID][param] = val
1420                             else:
1421                                 for param in params:
1422                                     try:
1423                                         data[job][str(build)][test_ID][param] =\
1424                                             test_data[param]
1425                                     except KeyError:
1426                                         data[job][str(build)][test_ID][param] =\
1427                                             "No Data"
1428             return data
1429
1430         except (KeyError, IndexError, ValueError) as err:
1431             logging.error("   Missing mandatory parameter in the element "
1432                           "specification: {0}".format(err))
1433             return None
1434         except AttributeError:
1435             return None
1436         except SyntaxError:
1437             logging.error("   The filter '{0}' is not correct. Check if all "
1438                           "tags are enclosed by apostrophes.".format(cond))
1439             return None
1440
1441     @staticmethod
1442     def merge_data(data):
1443         """Merge data from more jobs and builds to a simple data structure.
1444
1445         The output data structure is:
1446
1447         - test (suite) 1 ID:
1448           - param 1
1449           - param 2
1450           ...
1451           - param n
1452         ...
1453         - test (suite) n ID:
1454         ...
1455
1456         :param data: Data to merge.
1457         :type data: pandas.Series
1458         :returns: Merged data.
1459         :rtype: pandas.Series
1460         """
1461
1462         logging.info("    Merging data ...")
1463
1464         merged_data = pd.Series()
1465         for _, builds in data.iteritems():
1466             for _, item in builds.iteritems():
1467                 for ID, item_data in item.iteritems():
1468                     merged_data[ID] = item_data
1469
1470         return merged_data