CSIT-1377: Implement systematic solution using defined time period
[csit.git] / resources / tools / presentation / input_data_parser.py
1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Data pre-processing
15
16 - extract data from output.xml files generated by Jenkins jobs and store in
17   pandas' Series,
18 - provide access to the data.
19 - filter the data using tags,
20 """
21
22 import multiprocessing
23 import os
24 import re
25 import pandas as pd
26 import logging
27
28 from robot.api import ExecutionResult, ResultVisitor
29 from robot import errors
30 from collections import OrderedDict
31 from string import replace
32 from os import remove
33 from os.path import join
34 from datetime import datetime as dt
35 from datetime import timedelta
36 from jumpavg.AvgStdevMetadataFactory import AvgStdevMetadataFactory
37
38 from input_data_files import download_and_unzip_data_file
39 from utils import Worker
40
41
42 # Separator used in file names
43 SEPARATOR = "__"
44
45
46 class ExecutionChecker(ResultVisitor):
47     """Class to traverse through the test suite structure.
48
49     The functionality implemented in this class generates a json structure:
50
51     Performance tests:
52
53     {
54         "metadata": {
55             "generated": "Timestamp",
56             "version": "SUT version",
57             "job": "Jenkins job name",
58             "build": "Information about the build"
59         },
60         "suites": {
61             "Suite long name 1": {
62                 "name": Suite name,
63                 "doc": "Suite 1 documentation",
64                 "parent": "Suite 1 parent",
65                 "level": "Level of the suite in the suite hierarchy"
66             }
67             "Suite long name N": {
68                 "name": Suite name,
69                 "doc": "Suite N documentation",
70                 "parent": "Suite 2 parent",
71                 "level": "Level of the suite in the suite hierarchy"
72             }
73         }
74         "tests": {
75             # NDRPDR tests:
76             "ID": {
77                 "name": "Test name",
78                 "parent": "Name of the parent of the test",
79                 "doc": "Test documentation",
80                 "msg": "Test message",
81                 "vat-history": "DUT1 and DUT2 VAT History",
82                 "show-run": "Show Run",
83                 "tags": ["tag 1", "tag 2", "tag n"],
84                 "type": "NDRPDR",
85                 "status": "PASS" | "FAIL",
86                 "throughput": {
87                     "NDR": {
88                         "LOWER": float,
89                         "UPPER": float
90                     },
91                     "PDR": {
92                         "LOWER": float,
93                         "UPPER": float
94                     }
95                 },
96                 "latency": {
97                     "NDR": {
98                         "direction1": {
99                             "min": float,
100                             "avg": float,
101                             "max": float
102                         },
103                         "direction2": {
104                             "min": float,
105                             "avg": float,
106                             "max": float
107                         }
108                     },
109                     "PDR": {
110                         "direction1": {
111                             "min": float,
112                             "avg": float,
113                             "max": float
114                         },
115                         "direction2": {
116                             "min": float,
117                             "avg": float,
118                             "max": float
119                         }
120                     }
121                 }
122             }
123
124             # TCP tests:
125             "ID": {
126                 "name": "Test name",
127                 "parent": "Name of the parent of the test",
128                 "doc": "Test documentation",
129                 "msg": "Test message",
130                 "tags": ["tag 1", "tag 2", "tag n"],
131                 "type": "TCP",
132                 "status": "PASS" | "FAIL",
133                 "result": int
134             }
135
136             # MRR, BMRR tests:
137             "ID": {
138                 "name": "Test name",
139                 "parent": "Name of the parent of the test",
140                 "doc": "Test documentation",
141                 "msg": "Test message",
142                 "tags": ["tag 1", "tag 2", "tag n"],
143                 "type": "MRR" | "BMRR",
144                 "status": "PASS" | "FAIL",
145                 "result": {
146                     "receive-rate": AvgStdevMetadata,
147                 }
148             }
149
150             # TODO: Remove when definitely no NDRPDRDISC tests are used:
151             # NDRPDRDISC tests:
152             "ID": {
153                 "name": "Test name",
154                 "parent": "Name of the parent of the test",
155                 "doc": "Test documentation",
156                 "msg": "Test message",
157                 "tags": ["tag 1", "tag 2", "tag n"],
158                 "type": "PDR" | "NDR",
159                 "status": "PASS" | "FAIL",
160                 "throughput": {  # Only type: "PDR" | "NDR"
161                     "value": int,
162                     "unit": "pps" | "bps" | "percentage"
163                 },
164                 "latency": {  # Only type: "PDR" | "NDR"
165                     "direction1": {
166                         "100": {
167                             "min": int,
168                             "avg": int,
169                             "max": int
170                         },
171                         "50": {  # Only for NDR
172                             "min": int,
173                             "avg": int,
174                             "max": int
175                         },
176                         "10": {  # Only for NDR
177                             "min": int,
178                             "avg": int,
179                             "max": int
180                         }
181                     },
182                     "direction2": {
183                         "100": {
184                             "min": int,
185                             "avg": int,
186                             "max": int
187                         },
188                         "50": {  # Only for NDR
189                             "min": int,
190                             "avg": int,
191                             "max": int
192                         },
193                         "10": {  # Only for NDR
194                             "min": int,
195                             "avg": int,
196                             "max": int
197                         }
198                     }
199                 },
200                 "lossTolerance": "lossTolerance",  # Only type: "PDR"
201                 "vat-history": "DUT1 and DUT2 VAT History"
202                 "show-run": "Show Run"
203             },
204             "ID" {
205                 # next test
206             }
207         }
208     }
209
210
211     Functional tests:
212
213     {
214         "metadata": {  # Optional
215             "version": "VPP version",
216             "job": "Jenkins job name",
217             "build": "Information about the build"
218         },
219         "suites": {
220             "Suite name 1": {
221                 "doc": "Suite 1 documentation",
222                 "parent": "Suite 1 parent",
223                 "level": "Level of the suite in the suite hierarchy"
224             }
225             "Suite name N": {
226                 "doc": "Suite N documentation",
227                 "parent": "Suite 2 parent",
228                 "level": "Level of the suite in the suite hierarchy"
229             }
230         }
231         "tests": {
232             "ID": {
233                 "name": "Test name",
234                 "parent": "Name of the parent of the test",
235                 "doc": "Test documentation"
236                 "msg": "Test message"
237                 "tags": ["tag 1", "tag 2", "tag n"],
238                 "vat-history": "DUT1 and DUT2 VAT History"
239                 "show-run": "Show Run"
240                 "status": "PASS" | "FAIL"
241             },
242             "ID" {
243                 # next test
244             }
245         }
246     }
247
248     .. note:: ID is the lowercase full path to the test.
249     """
250
251     # TODO: Remove when definitely no NDRPDRDISC tests are used:
252     REGEX_RATE = re.compile(r'^[\D\d]*FINAL_RATE:\s(\d+\.\d+)\s(\w+)')
253
254     REGEX_NDRPDR_RATE = re.compile(r'NDR_LOWER:\s(\d+.\d+).*\n.*\n'
255                                    r'NDR_UPPER:\s(\d+.\d+).*\n'
256                                    r'PDR_LOWER:\s(\d+.\d+).*\n.*\n'
257                                    r'PDR_UPPER:\s(\d+.\d+)')
258
259     # TODO: Remove when definitely no NDRPDRDISC tests are used:
260     REGEX_LAT_NDR = re.compile(r'^[\D\d]*'
261                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
262                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
263                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
264                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]\s\n'
265                                r'LAT_\d+%NDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
266                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\]')
267
268     REGEX_LAT_PDR = re.compile(r'^[\D\d]*'
269                                r'LAT_\d+%PDR:\s\[\'(-?\d+/-?\d+/-?\d+)\','
270                                r'\s\'(-?\d+/-?\d+/-?\d+)\'\][\D\d]*')
271
272     REGEX_NDRPDR_LAT = re.compile(r'LATENCY.*\[\'(.*)\', \'(.*)\'\]\s\n.*\n.*\n'
273                                   r'LATENCY.*\[\'(.*)\', \'(.*)\'\]')
274
275     REGEX_TOLERANCE = re.compile(r'^[\D\d]*LOSS_ACCEPTANCE:\s(\d*\.\d*)\s'
276                                  r'[\D\d]*')
277
278     REGEX_VERSION_VPP = re.compile(r"(return STDOUT Version:\s*)(.*)")
279
280     REGEX_VERSION_DPDK = re.compile(r"(return STDOUT testpmd)([\d\D\n]*)"
281                                     r"(RTE Version: 'DPDK )(.*)(')")
282
283     REGEX_TCP = re.compile(r'Total\s(rps|cps|throughput):\s([0-9]*).*$')
284
285     REGEX_MRR = re.compile(r'MaxReceivedRate_Results\s\[pkts/(\d*)sec\]:\s'
286                            r'tx\s(\d*),\srx\s(\d*)')
287
288     REGEX_BMRR = re.compile(r'Maximum Receive Rate trial results'
289                             r' in packets per second: \[(.*)\]')
290
291     REGEX_TC_TAG = re.compile(r'\d+[tT]\d+[cC]')
292
293     REGEX_TC_NAME_OLD = re.compile(r'-\d+[tT]\d+[cC]-')
294
295     REGEX_TC_NAME_NEW = re.compile(r'-\d+[cC]-')
296
297     REGEX_TC_NUMBER = re.compile(r'tc[0-9]{2}-')
298
299     def __init__(self, metadata, mapping, ignore):
300         """Initialisation.
301
302         :param metadata: Key-value pairs to be included in "metadata" part of
303             JSON structure.
304         :param mapping: Mapping of the old names of test cases to the new
305             (actual) one.
306         :param ignore: List of TCs to be ignored.
307         :type metadata: dict
308         :type mapping: dict
309         :type ignore: list
310         """
311
312         # Type of message to parse out from the test messages
313         self._msg_type = None
314
315         # VPP version
316         self._version = None
317
318         # Timestamp
319         self._timestamp = None
320
321         # Mapping of TCs long names
322         self._mapping = mapping
323
324         # Ignore list
325         self._ignore = ignore
326
327         # Number of VAT History messages found:
328         # 0 - no message
329         # 1 - VAT History of DUT1
330         # 2 - VAT History of DUT2
331         self._lookup_kw_nr = 0
332         self._vat_history_lookup_nr = 0
333
334         # Number of Show Running messages found
335         # 0 - no message
336         # 1 - Show run message found
337         self._show_run_lookup_nr = 0
338
339         # Test ID of currently processed test- the lowercase full path to the
340         # test
341         self._test_ID = None
342
343         # The main data structure
344         self._data = {
345             "metadata": OrderedDict(),
346             "suites": OrderedDict(),
347             "tests": OrderedDict()
348         }
349
350         # Save the provided metadata
351         for key, val in metadata.items():
352             self._data["metadata"][key] = val
353
354         # Dictionary defining the methods used to parse different types of
355         # messages
356         self.parse_msg = {
357             "timestamp": self._get_timestamp,
358             "vpp-version": self._get_vpp_version,
359             "dpdk-version": self._get_dpdk_version,
360             "teardown-vat-history": self._get_vat_history,
361             "test-show-runtime": self._get_show_run
362         }
363
364     @property
365     def data(self):
366         """Getter - Data parsed from the XML file.
367
368         :returns: Data parsed from the XML file.
369         :rtype: dict
370         """
371         return self._data
372
373     def _get_vpp_version(self, msg):
374         """Called when extraction of VPP version is required.
375
376         :param msg: Message to process.
377         :type msg: Message
378         :returns: Nothing.
379         """
380
381         if msg.message.count("return STDOUT Version:"):
382             self._version = str(re.search(self.REGEX_VERSION_VPP, msg.message).
383                                 group(2))
384             self._data["metadata"]["version"] = self._version
385             self._msg_type = None
386
387     def _get_dpdk_version(self, msg):
388         """Called when extraction of DPDK version is required.
389
390         :param msg: Message to process.
391         :type msg: Message
392         :returns: Nothing.
393         """
394
395         if msg.message.count("return STDOUT testpmd"):
396             try:
397                 self._version = str(re.search(
398                     self.REGEX_VERSION_DPDK, msg.message). group(4))
399                 self._data["metadata"]["version"] = self._version
400             except IndexError:
401                 pass
402             finally:
403                 self._msg_type = None
404
405     def _get_timestamp(self, msg):
406         """Called when extraction of timestamp is required.
407
408         :param msg: Message to process.
409         :type msg: Message
410         :returns: Nothing.
411         """
412
413         self._timestamp = msg.timestamp[:14]
414         self._data["metadata"]["generated"] = self._timestamp
415         self._msg_type = None
416
417     def _get_vat_history(self, msg):
418         """Called when extraction of VAT command history is required.
419
420         :param msg: Message to process.
421         :type msg: Message
422         :returns: Nothing.
423         """
424         if msg.message.count("VAT command history:"):
425             self._vat_history_lookup_nr += 1
426             if self._vat_history_lookup_nr == 1:
427                 self._data["tests"][self._test_ID]["vat-history"] = str()
428             else:
429                 self._msg_type = None
430             text = re.sub("[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3} "
431                           "VAT command history:", "", msg.message, count=1). \
432                 replace("\n\n", "\n").replace('\n', ' |br| ').\
433                 replace('\r', '').replace('"', "'")
434
435             self._data["tests"][self._test_ID]["vat-history"] += " |br| "
436             self._data["tests"][self._test_ID]["vat-history"] += \
437                 "**DUT" + str(self._vat_history_lookup_nr) + ":** " + text
438
439     def _get_show_run(self, msg):
440         """Called when extraction of VPP operational data (output of CLI command
441         Show Runtime) is required.
442
443         :param msg: Message to process.
444         :type msg: Message
445         :returns: Nothing.
446         """
447         if msg.message.count("return STDOUT Thread "):
448             self._show_run_lookup_nr += 1
449             if self._lookup_kw_nr == 1 and self._show_run_lookup_nr == 1:
450                 self._data["tests"][self._test_ID]["show-run"] = str()
451             if self._lookup_kw_nr > 1:
452                 self._msg_type = None
453             if self._show_run_lookup_nr == 1:
454                 text = msg.message.replace("vat# ", "").\
455                     replace("return STDOUT ", "").replace("\n\n", "\n").\
456                     replace('\n', ' |br| ').\
457                     replace('\r', '').replace('"', "'")
458                 try:
459                     self._data["tests"][self._test_ID]["show-run"] += " |br| "
460                     self._data["tests"][self._test_ID]["show-run"] += \
461                         "**DUT" + str(self._lookup_kw_nr) + ":** |br| " + text
462                 except KeyError:
463                     pass
464
465     # TODO: Remove when definitely no NDRPDRDISC tests are used:
466     def _get_latency(self, msg, test_type):
467         """Get the latency data from the test message.
468
469         :param msg: Message to be parsed.
470         :param test_type: Type of the test - NDR or PDR.
471         :type msg: str
472         :type test_type: str
473         :returns: Latencies parsed from the message.
474         :rtype: dict
475         """
476
477         if test_type == "NDR":
478             groups = re.search(self.REGEX_LAT_NDR, msg)
479             groups_range = range(1, 7)
480         elif test_type == "PDR":
481             groups = re.search(self.REGEX_LAT_PDR, msg)
482             groups_range = range(1, 3)
483         else:
484             return {}
485
486         latencies = list()
487         for idx in groups_range:
488             try:
489                 lat = [int(item) for item in str(groups.group(idx)).split('/')]
490             except (AttributeError, ValueError):
491                 lat = [-1, -1, -1]
492             latencies.append(lat)
493
494         keys = ("min", "avg", "max")
495         latency = {
496             "direction1": {
497             },
498             "direction2": {
499             }
500         }
501
502         latency["direction1"]["100"] = dict(zip(keys, latencies[0]))
503         latency["direction2"]["100"] = dict(zip(keys, latencies[1]))
504         if test_type == "NDR":
505             latency["direction1"]["50"] = dict(zip(keys, latencies[2]))
506             latency["direction2"]["50"] = dict(zip(keys, latencies[3]))
507             latency["direction1"]["10"] = dict(zip(keys, latencies[4]))
508             latency["direction2"]["10"] = dict(zip(keys, latencies[5]))
509
510         return latency
511
512     def _get_ndrpdr_throughput(self, msg):
513         """Get NDR_LOWER, NDR_UPPER, PDR_LOWER and PDR_UPPER from the test
514         message.
515
516         :param msg: The test message to be parsed.
517         :type msg: str
518         :returns: Parsed data as a dict and the status (PASS/FAIL).
519         :rtype: tuple(dict, str)
520         """
521
522         throughput = {
523             "NDR": {"LOWER": -1.0, "UPPER": -1.0},
524             "PDR": {"LOWER": -1.0, "UPPER": -1.0}
525         }
526         status = "FAIL"
527         groups = re.search(self.REGEX_NDRPDR_RATE, msg)
528
529         if groups is not None:
530             try:
531                 throughput["NDR"]["LOWER"] = float(groups.group(1))
532                 throughput["NDR"]["UPPER"] = float(groups.group(2))
533                 throughput["PDR"]["LOWER"] = float(groups.group(3))
534                 throughput["PDR"]["UPPER"] = float(groups.group(4))
535                 status = "PASS"
536             except (IndexError, ValueError):
537                 pass
538
539         return throughput, status
540
541     def _get_ndrpdr_latency(self, msg):
542         """Get LATENCY from the test message.
543
544         :param msg: The test message to be parsed.
545         :type msg: str
546         :returns: Parsed data as a dict and the status (PASS/FAIL).
547         :rtype: tuple(dict, str)
548         """
549
550         latency = {
551             "NDR": {
552                 "direction1": {"min": -1.0, "avg": -1.0, "max": -1.0},
553                 "direction2": {"min": -1.0, "avg": -1.0, "max": -1.0}
554             },
555             "PDR": {
556                 "direction1": {"min": -1.0, "avg": -1.0, "max": -1.0},
557                 "direction2": {"min": -1.0, "avg": -1.0, "max": -1.0}
558             }
559         }
560         status = "FAIL"
561         groups = re.search(self.REGEX_NDRPDR_LAT, msg)
562
563         if groups is not None:
564             keys = ("min", "avg", "max")
565             try:
566                 latency["NDR"]["direction1"] = dict(
567                     zip(keys, [float(l) for l in groups.group(1).split('/')]))
568                 latency["NDR"]["direction2"] = dict(
569                     zip(keys, [float(l) for l in groups.group(2).split('/')]))
570                 latency["PDR"]["direction1"] = dict(
571                     zip(keys, [float(l) for l in groups.group(3).split('/')]))
572                 latency["PDR"]["direction2"] = dict(
573                     zip(keys, [float(l) for l in groups.group(4).split('/')]))
574                 status = "PASS"
575             except (IndexError, ValueError):
576                 pass
577
578         return latency, status
579
580     def visit_suite(self, suite):
581         """Implements traversing through the suite and its direct children.
582
583         :param suite: Suite to process.
584         :type suite: Suite
585         :returns: Nothing.
586         """
587         if self.start_suite(suite) is not False:
588             suite.suites.visit(self)
589             suite.tests.visit(self)
590             self.end_suite(suite)
591
592     def start_suite(self, suite):
593         """Called when suite starts.
594
595         :param suite: Suite to process.
596         :type suite: Suite
597         :returns: Nothing.
598         """
599
600         try:
601             parent_name = suite.parent.name
602         except AttributeError:
603             return
604
605         doc_str = suite.doc.replace('"', "'").replace('\n', ' ').\
606             replace('\r', '').replace('*[', ' |br| *[').replace("*", "**")
607         doc_str = replace(doc_str, ' |br| *[', '*[', maxreplace=1)
608
609         self._data["suites"][suite.longname.lower().replace('"', "'").
610             replace(" ", "_")] = {
611                 "name": suite.name.lower(),
612                 "doc": doc_str,
613                 "parent": parent_name,
614                 "level": len(suite.longname.split("."))
615             }
616
617         suite.keywords.visit(self)
618
619     def end_suite(self, suite):
620         """Called when suite ends.
621
622         :param suite: Suite to process.
623         :type suite: Suite
624         :returns: Nothing.
625         """
626         pass
627
628     def visit_test(self, test):
629         """Implements traversing through the test.
630
631         :param test: Test to process.
632         :type test: Test
633         :returns: Nothing.
634         """
635         if self.start_test(test) is not False:
636             test.keywords.visit(self)
637             self.end_test(test)
638
639     def start_test(self, test):
640         """Called when test starts.
641
642         :param test: Test to process.
643         :type test: Test
644         :returns: Nothing.
645         """
646
647         longname_orig = test.longname.lower()
648
649         # Check the ignore list
650         if longname_orig in self._ignore:
651             return
652
653         tags = [str(tag) for tag in test.tags]
654         test_result = dict()
655
656         # Change the TC long name and name if defined in the mapping table
657         longname = self._mapping.get(longname_orig, None)
658         if longname is not None:
659             name = longname.split('.')[-1]
660             logging.debug("{0}\n{1}\n{2}\n{3}".format(
661                 self._data["metadata"], longname_orig, longname, name))
662         else:
663             longname = longname_orig
664             name = test.name.lower()
665
666         # Remove TC number from the TC long name (backward compatibility):
667         self._test_ID = re.sub(self.REGEX_TC_NUMBER, "", longname)
668         # Remove TC number from the TC name (not needed):
669         test_result["name"] = re.sub(self.REGEX_TC_NUMBER, "", name)
670
671         test_result["parent"] = test.parent.name.lower()
672         test_result["tags"] = tags
673         doc_str = test.doc.replace('"', "'").replace('\n', ' '). \
674             replace('\r', '').replace('[', ' |br| [')
675         test_result["doc"] = replace(doc_str, ' |br| [', '[', maxreplace=1)
676         test_result["msg"] = test.message.replace('\n', ' |br| '). \
677             replace('\r', '').replace('"', "'")
678         test_result["type"] = "FUNC"
679         test_result["status"] = test.status
680
681         if "PERFTEST" in tags:
682             # Replace info about cores (e.g. -1c-) with the info about threads
683             # and cores (e.g. -1t1c-) in the long test case names and in the
684             # test case names if necessary.
685             groups = re.search(self.REGEX_TC_NAME_OLD, self._test_ID)
686             if not groups:
687                 tag_count = 0
688                 for tag in test_result["tags"]:
689                     groups = re.search(self.REGEX_TC_TAG, tag)
690                     if groups:
691                         tag_count += 1
692                         tag_tc = tag
693
694                 if tag_count == 1:
695                     self._test_ID = re.sub(self.REGEX_TC_NAME_NEW,
696                                            "-{0}-".format(tag_tc.lower()),
697                                            self._test_ID,
698                                            count=1)
699                     test_result["name"] = re.sub(self.REGEX_TC_NAME_NEW,
700                                                  "-{0}-".format(tag_tc.lower()),
701                                                  test_result["name"],
702                                                  count=1)
703                 else:
704                     test_result["status"] = "FAIL"
705                     self._data["tests"][self._test_ID] = test_result
706                     logging.debug("The test '{0}' has no or more than one "
707                                   "multi-threading tags.".format(self._test_ID))
708                     logging.debug("Tags: {0}".format(test_result["tags"]))
709                     return
710
711         if test.status == "PASS" and ("NDRPDRDISC" in tags or
712                                       "NDRPDR" in tags or
713                                       "TCP" in tags or
714                                       "MRR" in tags or
715                                       "BMRR" in tags):
716             # TODO: Remove when definitely no NDRPDRDISC tests are used:
717             if "NDRDISC" in tags:
718                 test_result["type"] = "NDR"
719             # TODO: Remove when definitely no NDRPDRDISC tests are used:
720             elif "PDRDISC" in tags:
721                 test_result["type"] = "PDR"
722             elif "NDRPDR" in tags:
723                 test_result["type"] = "NDRPDR"
724             elif "TCP" in tags:
725                 test_result["type"] = "TCP"
726             elif "MRR" in tags:
727                 test_result["type"] = "MRR"
728             elif "FRMOBL" in tags or "BMRR" in tags:
729                 test_result["type"] = "BMRR"
730             else:
731                 test_result["status"] = "FAIL"
732                 self._data["tests"][self._test_ID] = test_result
733                 return
734
735             # TODO: Remove when definitely no NDRPDRDISC tests are used:
736             if test_result["type"] in ("NDR", "PDR"):
737                 try:
738                     rate_value = str(re.search(
739                         self.REGEX_RATE, test.message).group(1))
740                 except AttributeError:
741                     rate_value = "-1"
742                 try:
743                     rate_unit = str(re.search(
744                         self.REGEX_RATE, test.message).group(2))
745                 except AttributeError:
746                     rate_unit = "-1"
747
748                 test_result["throughput"] = dict()
749                 test_result["throughput"]["value"] = \
750                     int(rate_value.split('.')[0])
751                 test_result["throughput"]["unit"] = rate_unit
752                 test_result["latency"] = \
753                     self._get_latency(test.message, test_result["type"])
754                 if test_result["type"] == "PDR":
755                     test_result["lossTolerance"] = str(re.search(
756                         self.REGEX_TOLERANCE, test.message).group(1))
757
758             elif test_result["type"] in ("NDRPDR", ):
759                 test_result["throughput"], test_result["status"] = \
760                     self._get_ndrpdr_throughput(test.message)
761                 test_result["latency"], test_result["status"] = \
762                     self._get_ndrpdr_latency(test.message)
763
764             elif test_result["type"] in ("TCP", ):
765                 groups = re.search(self.REGEX_TCP, test.message)
766                 test_result["result"] = int(groups.group(2))
767
768             elif test_result["type"] in ("MRR", "BMRR"):
769                 test_result["result"] = dict()
770                 groups = re.search(self.REGEX_BMRR, test.message)
771                 if groups is not None:
772                     items_str = groups.group(1)
773                     items_float = [float(item.strip()) for item
774                                    in items_str.split(",")]
775                     metadata = AvgStdevMetadataFactory.from_data(items_float)
776                     # Next two lines have been introduced in CSIT-1179,
777                     # to be removed in CSIT-1180.
778                     metadata.size = 1
779                     metadata.stdev = 0.0
780                     test_result["result"]["receive-rate"] = metadata
781                 else:
782                     groups = re.search(self.REGEX_MRR, test.message)
783                     test_result["result"]["receive-rate"] = \
784                         AvgStdevMetadataFactory.from_data([
785                             float(groups.group(3)) / float(groups.group(1)), ])
786
787         self._data["tests"][self._test_ID] = test_result
788
789     def end_test(self, test):
790         """Called when test ends.
791
792         :param test: Test to process.
793         :type test: Test
794         :returns: Nothing.
795         """
796         pass
797
798     def visit_keyword(self, keyword):
799         """Implements traversing through the keyword and its child keywords.
800
801         :param keyword: Keyword to process.
802         :type keyword: Keyword
803         :returns: Nothing.
804         """
805         if self.start_keyword(keyword) is not False:
806             self.end_keyword(keyword)
807
808     def start_keyword(self, keyword):
809         """Called when keyword starts. Default implementation does nothing.
810
811         :param keyword: Keyword to process.
812         :type keyword: Keyword
813         :returns: Nothing.
814         """
815         try:
816             if keyword.type == "setup":
817                 self.visit_setup_kw(keyword)
818             elif keyword.type == "teardown":
819                 self._lookup_kw_nr = 0
820                 self.visit_teardown_kw(keyword)
821             else:
822                 self._lookup_kw_nr = 0
823                 self.visit_test_kw(keyword)
824         except AttributeError:
825             pass
826
827     def end_keyword(self, keyword):
828         """Called when keyword ends. Default implementation does nothing.
829
830         :param keyword: Keyword to process.
831         :type keyword: Keyword
832         :returns: Nothing.
833         """
834         pass
835
836     def visit_test_kw(self, test_kw):
837         """Implements traversing through the test keyword and its child
838         keywords.
839
840         :param test_kw: Keyword to process.
841         :type test_kw: Keyword
842         :returns: Nothing.
843         """
844         for keyword in test_kw.keywords:
845             if self.start_test_kw(keyword) is not False:
846                 self.visit_test_kw(keyword)
847                 self.end_test_kw(keyword)
848
849     def start_test_kw(self, test_kw):
850         """Called when test keyword starts. Default implementation does
851         nothing.
852
853         :param test_kw: Keyword to process.
854         :type test_kw: Keyword
855         :returns: Nothing.
856         """
857         if test_kw.name.count("Show Runtime Counters On All Duts"):
858             self._lookup_kw_nr += 1
859             self._show_run_lookup_nr = 0
860             self._msg_type = "test-show-runtime"
861         elif test_kw.name.count("Start The L2fwd Test") and not self._version:
862             self._msg_type = "dpdk-version"
863         else:
864             return
865         test_kw.messages.visit(self)
866
867     def end_test_kw(self, test_kw):
868         """Called when keyword ends. Default implementation does nothing.
869
870         :param test_kw: Keyword to process.
871         :type test_kw: Keyword
872         :returns: Nothing.
873         """
874         pass
875
876     def visit_setup_kw(self, setup_kw):
877         """Implements traversing through the teardown keyword and its child
878         keywords.
879
880         :param setup_kw: Keyword to process.
881         :type setup_kw: Keyword
882         :returns: Nothing.
883         """
884         for keyword in setup_kw.keywords:
885             if self.start_setup_kw(keyword) is not False:
886                 self.visit_setup_kw(keyword)
887                 self.end_setup_kw(keyword)
888
889     def start_setup_kw(self, setup_kw):
890         """Called when teardown keyword starts. Default implementation does
891         nothing.
892
893         :param setup_kw: Keyword to process.
894         :type setup_kw: Keyword
895         :returns: Nothing.
896         """
897         if setup_kw.name.count("Show Vpp Version On All Duts") \
898                 and not self._version:
899             self._msg_type = "vpp-version"
900
901         elif setup_kw.name.count("Setup performance global Variables") \
902                 and not self._timestamp:
903             self._msg_type = "timestamp"
904         else:
905             return
906         setup_kw.messages.visit(self)
907
908     def end_setup_kw(self, setup_kw):
909         """Called when keyword ends. Default implementation does nothing.
910
911         :param setup_kw: Keyword to process.
912         :type setup_kw: Keyword
913         :returns: Nothing.
914         """
915         pass
916
917     def visit_teardown_kw(self, teardown_kw):
918         """Implements traversing through the teardown keyword and its child
919         keywords.
920
921         :param teardown_kw: Keyword to process.
922         :type teardown_kw: Keyword
923         :returns: Nothing.
924         """
925         for keyword in teardown_kw.keywords:
926             if self.start_teardown_kw(keyword) is not False:
927                 self.visit_teardown_kw(keyword)
928                 self.end_teardown_kw(keyword)
929
930     def start_teardown_kw(self, teardown_kw):
931         """Called when teardown keyword starts. Default implementation does
932         nothing.
933
934         :param teardown_kw: Keyword to process.
935         :type teardown_kw: Keyword
936         :returns: Nothing.
937         """
938
939         if teardown_kw.name.count("Show Vat History On All Duts"):
940             self._vat_history_lookup_nr = 0
941             self._msg_type = "teardown-vat-history"
942             teardown_kw.messages.visit(self)
943
944     def end_teardown_kw(self, teardown_kw):
945         """Called when keyword ends. Default implementation does nothing.
946
947         :param teardown_kw: Keyword to process.
948         :type teardown_kw: Keyword
949         :returns: Nothing.
950         """
951         pass
952
953     def visit_message(self, msg):
954         """Implements visiting the message.
955
956         :param msg: Message to process.
957         :type msg: Message
958         :returns: Nothing.
959         """
960         if self.start_message(msg) is not False:
961             self.end_message(msg)
962
963     def start_message(self, msg):
964         """Called when message starts. Get required information from messages:
965         - VPP version.
966
967         :param msg: Message to process.
968         :type msg: Message
969         :returns: Nothing.
970         """
971
972         if self._msg_type:
973             self.parse_msg[self._msg_type](msg)
974
975     def end_message(self, msg):
976         """Called when message ends. Default implementation does nothing.
977
978         :param msg: Message to process.
979         :type msg: Message
980         :returns: Nothing.
981         """
982         pass
983
984
985 class InputData(object):
986     """Input data
987
988     The data is extracted from output.xml files generated by Jenkins jobs and
989     stored in pandas' DataFrames.
990
991     The data structure:
992     - job name
993       - build number
994         - metadata
995           (as described in ExecutionChecker documentation)
996         - suites
997           (as described in ExecutionChecker documentation)
998         - tests
999           (as described in ExecutionChecker documentation)
1000     """
1001
1002     def __init__(self, spec):
1003         """Initialization.
1004
1005         :param spec: Specification.
1006         :type spec: Specification
1007         """
1008
1009         # Specification:
1010         self._cfg = spec
1011
1012         # Data store:
1013         self._input_data = pd.Series()
1014
1015     @property
1016     def data(self):
1017         """Getter - Input data.
1018
1019         :returns: Input data
1020         :rtype: pandas.Series
1021         """
1022         return self._input_data
1023
1024     def metadata(self, job, build):
1025         """Getter - metadata
1026
1027         :param job: Job which metadata we want.
1028         :param build: Build which metadata we want.
1029         :type job: str
1030         :type build: str
1031         :returns: Metadata
1032         :rtype: pandas.Series
1033         """
1034
1035         return self.data[job][build]["metadata"]
1036
1037     def suites(self, job, build):
1038         """Getter - suites
1039
1040         :param job: Job which suites we want.
1041         :param build: Build which suites we want.
1042         :type job: str
1043         :type build: str
1044         :returns: Suites.
1045         :rtype: pandas.Series
1046         """
1047
1048         return self.data[job][str(build)]["suites"]
1049
1050     def tests(self, job, build):
1051         """Getter - tests
1052
1053         :param job: Job which tests we want.
1054         :param build: Build which tests we want.
1055         :type job: str
1056         :type build: str
1057         :returns: Tests.
1058         :rtype: pandas.Series
1059         """
1060
1061         return self.data[job][build]["tests"]
1062
1063     def _parse_tests(self, job, build, log):
1064         """Process data from robot output.xml file and return JSON structured
1065         data.
1066
1067         :param job: The name of job which build output data will be processed.
1068         :param build: The build which output data will be processed.
1069         :param log: List of log messages.
1070         :type job: str
1071         :type build: dict
1072         :type log: list of tuples (severity, msg)
1073         :returns: JSON data structure.
1074         :rtype: dict
1075         """
1076
1077         metadata = {
1078             "job": job,
1079             "build": build
1080         }
1081
1082         with open(build["file-name"], 'r') as data_file:
1083             try:
1084                 result = ExecutionResult(data_file)
1085             except errors.DataError as err:
1086                 log.append(("ERROR", "Error occurred while parsing output.xml: "
1087                                      "{0}".format(err)))
1088                 return None
1089         checker = ExecutionChecker(metadata, self._cfg.mapping,
1090                                    self._cfg.ignore)
1091         result.visit(checker)
1092
1093         return checker.data
1094
1095     def _download_and_parse_build(self, pid, data_queue, job, build, repeat):
1096         """Download and parse the input data file.
1097
1098         :param pid: PID of the process executing this method.
1099         :param data_queue: Shared memory between processes. Queue which keeps
1100             the result data. This data is then read by the main process and used
1101             in further processing.
1102         :param job: Name of the Jenkins job which generated the processed input
1103             file.
1104         :param build: Information about the Jenkins build which generated the
1105             processed input file.
1106         :param repeat: Repeat the download specified number of times if not
1107             successful.
1108         :type pid: int
1109         :type data_queue: multiprocessing.Manager().Queue()
1110         :type job: str
1111         :type build: dict
1112         :type repeat: int
1113         """
1114
1115         logs = list()
1116
1117         logging.info("  Processing the job/build: {0}: {1}".
1118                      format(job, build["build"]))
1119
1120         logs.append(("INFO", "  Processing the job/build: {0}: {1}".
1121                      format(job, build["build"])))
1122
1123         state = "failed"
1124         success = False
1125         data = None
1126         do_repeat = repeat
1127         while do_repeat:
1128             success = download_and_unzip_data_file(self._cfg, job, build, pid,
1129                                                    logs)
1130             if success:
1131                 break
1132             do_repeat -= 1
1133         if not success:
1134             logs.append(("ERROR", "It is not possible to download the input "
1135                                   "data file from the job '{job}', build "
1136                                   "'{build}', or it is damaged. Skipped.".
1137                          format(job=job, build=build["build"])))
1138         if success:
1139             logs.append(("INFO", "  Processing data from the build '{0}' ...".
1140                          format(build["build"])))
1141             data = self._parse_tests(job, build, logs)
1142             if data is None:
1143                 logs.append(("ERROR", "Input data file from the job '{job}', "
1144                                       "build '{build}' is damaged. Skipped.".
1145                              format(job=job, build=build["build"])))
1146             else:
1147                 state = "processed"
1148
1149             try:
1150                 remove(build["file-name"])
1151             except OSError as err:
1152                 logs.append(("ERROR", "Cannot remove the file '{0}': {1}".
1153                              format(build["file-name"], repr(err))))
1154
1155         # If the time-period is defined in the specification file, remove all
1156         # files which are outside the time period.
1157         timeperiod = self._cfg.input.get("time-period", None)
1158         if timeperiod and data:
1159             now = dt.utcnow()
1160             timeperiod = timedelta(int(timeperiod))
1161             metadata = data.get("metadata", None)
1162             if metadata:
1163                 generated = metadata.get("generated", None)
1164                 if generated:
1165                     generated = dt.strptime(generated, "%Y%m%d %H:%M")
1166                     if (now - generated) > timeperiod:
1167                         # Remove the data and the file:
1168                         state = "removed"
1169                         data = None
1170                         logs.append(
1171                             ("INFO",
1172                              "    The build {job}/{build} is outdated, will be "
1173                              "removed".format(job=job, build=build["build"])))
1174                         file_name = self._cfg.input["file-name"]
1175                         full_name = join(
1176                             self._cfg.environment["paths"]["DIR[WORKING,DATA]"],
1177                             "{job}{sep}{build}{sep}{name}".
1178                                 format(job=job,
1179                                        sep=SEPARATOR,
1180                                        build=build["build"],
1181                                        name=file_name))
1182                         try:
1183                             remove(full_name)
1184                             logs.append(("INFO",
1185                                          "    The file {name} has been removed".
1186                                          format(name=full_name)))
1187                         except OSError as err:
1188                             logs.append(("ERROR",
1189                                         "Cannot remove the file '{0}': {1}".
1190                                         format(full_name, repr(err))))
1191
1192         logs.append(("INFO", "  Done."))
1193
1194         result = {
1195             "data": data,
1196             "state": state,
1197             "job": job,
1198             "build": build,
1199             "logs": logs
1200         }
1201         data_queue.put(result)
1202
1203     def download_and_parse_data(self, repeat=1):
1204         """Download the input data files, parse input data from input files and
1205         store in pandas' Series.
1206
1207         :param repeat: Repeat the download specified number of times if not
1208             successful.
1209         :type repeat: int
1210         """
1211
1212         logging.info("Downloading and parsing input files ...")
1213
1214         work_queue = multiprocessing.JoinableQueue()
1215         manager = multiprocessing.Manager()
1216         data_queue = manager.Queue()
1217         cpus = multiprocessing.cpu_count()
1218
1219         workers = list()
1220         for cpu in range(cpus):
1221             worker = Worker(work_queue,
1222                             data_queue,
1223                             self._download_and_parse_build)
1224             worker.daemon = True
1225             worker.start()
1226             workers.append(worker)
1227             os.system("taskset -p -c {0} {1} > /dev/null 2>&1".
1228                       format(cpu, worker.pid))
1229
1230         for job, builds in self._cfg.builds.items():
1231             for build in builds:
1232                 work_queue.put((job, build, repeat))
1233
1234         work_queue.join()
1235
1236         logging.info("Done.")
1237
1238         while not data_queue.empty():
1239             result = data_queue.get()
1240
1241             job = result["job"]
1242             build_nr = result["build"]["build"]
1243
1244             if result["data"]:
1245                 data = result["data"]
1246                 build_data = pd.Series({
1247                     "metadata": pd.Series(data["metadata"].values(),
1248                                           index=data["metadata"].keys()),
1249                     "suites": pd.Series(data["suites"].values(),
1250                                         index=data["suites"].keys()),
1251                     "tests": pd.Series(data["tests"].values(),
1252                                        index=data["tests"].keys())})
1253
1254                 if self._input_data.get(job, None) is None:
1255                     self._input_data[job] = pd.Series()
1256                 self._input_data[job][str(build_nr)] = build_data
1257
1258                 self._cfg.set_input_file_name(job, build_nr,
1259                                               result["build"]["file-name"])
1260
1261             self._cfg.set_input_state(job, build_nr, result["state"])
1262
1263             for item in result["logs"]:
1264                 if item[0] == "INFO":
1265                     logging.info(item[1])
1266                 elif item[0] == "ERROR":
1267                     logging.error(item[1])
1268                 elif item[0] == "DEBUG":
1269                     logging.debug(item[1])
1270                 elif item[0] == "CRITICAL":
1271                     logging.critical(item[1])
1272                 elif item[0] == "WARNING":
1273                     logging.warning(item[1])
1274
1275         del data_queue
1276
1277         # Terminate all workers
1278         for worker in workers:
1279             worker.terminate()
1280             worker.join()
1281
1282         logging.info("Done.")
1283
1284     @staticmethod
1285     def _end_of_tag(tag_filter, start=0, closer="'"):
1286         """Return the index of character in the string which is the end of tag.
1287
1288         :param tag_filter: The string where the end of tag is being searched.
1289         :param start: The index where the searching is stated.
1290         :param closer: The character which is the tag closer.
1291         :type tag_filter: str
1292         :type start: int
1293         :type closer: str
1294         :returns: The index of the tag closer.
1295         :rtype: int
1296         """
1297
1298         try:
1299             idx_opener = tag_filter.index(closer, start)
1300             return tag_filter.index(closer, idx_opener + 1)
1301         except ValueError:
1302             return None
1303
1304     @staticmethod
1305     def _condition(tag_filter):
1306         """Create a conditional statement from the given tag filter.
1307
1308         :param tag_filter: Filter based on tags from the element specification.
1309         :type tag_filter: str
1310         :returns: Conditional statement which can be evaluated.
1311         :rtype: str
1312         """
1313
1314         index = 0
1315         while True:
1316             index = InputData._end_of_tag(tag_filter, index)
1317             if index is None:
1318                 return tag_filter
1319             index += 1
1320             tag_filter = tag_filter[:index] + " in tags" + tag_filter[index:]
1321
1322     def filter_data(self, element, params=None, data_set="tests",
1323                     continue_on_error=False):
1324         """Filter required data from the given jobs and builds.
1325
1326         The output data structure is:
1327
1328         - job 1
1329           - build 1
1330             - test (or suite) 1 ID:
1331               - param 1
1332               - param 2
1333               ...
1334               - param n
1335             ...
1336             - test (or suite) n ID:
1337             ...
1338           ...
1339           - build n
1340         ...
1341         - job n
1342
1343         :param element: Element which will use the filtered data.
1344         :param params: Parameters which will be included in the output. If None,
1345         all parameters are included.
1346         :param data_set: The set of data to be filtered: tests, suites,
1347         metadata.
1348         :param continue_on_error: Continue if there is error while reading the
1349         data. The Item will be empty then
1350         :type element: pandas.Series
1351         :type params: list
1352         :type data_set: str
1353         :type continue_on_error: bool
1354         :returns: Filtered data.
1355         :rtype pandas.Series
1356         """
1357
1358         try:
1359             if element["filter"] in ("all", "template"):
1360                 cond = "True"
1361             else:
1362                 cond = InputData._condition(element["filter"])
1363             logging.debug("   Filter: {0}".format(cond))
1364         except KeyError:
1365             logging.error("  No filter defined.")
1366             return None
1367
1368         if params is None:
1369             params = element.get("parameters", None)
1370             if params:
1371                 params.append("type")
1372
1373         data = pd.Series()
1374         try:
1375             for job, builds in element["data"].items():
1376                 data[job] = pd.Series()
1377                 for build in builds:
1378                     data[job][str(build)] = pd.Series()
1379                     try:
1380                         data_iter = self.data[job][str(build)][data_set].\
1381                             iteritems()
1382                     except KeyError:
1383                         if continue_on_error:
1384                             continue
1385                         else:
1386                             return None
1387                     for test_ID, test_data in data_iter:
1388                         if eval(cond, {"tags": test_data.get("tags", "")}):
1389                             data[job][str(build)][test_ID] = pd.Series()
1390                             if params is None:
1391                                 for param, val in test_data.items():
1392                                     data[job][str(build)][test_ID][param] = val
1393                             else:
1394                                 for param in params:
1395                                     try:
1396                                         data[job][str(build)][test_ID][param] =\
1397                                             test_data[param]
1398                                     except KeyError:
1399                                         data[job][str(build)][test_ID][param] =\
1400                                             "No Data"
1401             return data
1402
1403         except (KeyError, IndexError, ValueError) as err:
1404             logging.error("   Missing mandatory parameter in the element "
1405                           "specification: {0}".format(err))
1406             return None
1407         except AttributeError:
1408             return None
1409         except SyntaxError:
1410             logging.error("   The filter '{0}' is not correct. Check if all "
1411                           "tags are enclosed by apostrophes.".format(cond))
1412             return None
1413
1414     @staticmethod
1415     def merge_data(data):
1416         """Merge data from more jobs and builds to a simple data structure.
1417
1418         The output data structure is:
1419
1420         - test (suite) 1 ID:
1421           - param 1
1422           - param 2
1423           ...
1424           - param n
1425         ...
1426         - test (suite) n ID:
1427         ...
1428
1429         :param data: Data to merge.
1430         :type data: pandas.Series
1431         :returns: Merged data.
1432         :rtype: pandas.Series
1433         """
1434
1435         logging.info("    Merging data ...")
1436
1437         merged_data = pd.Series()
1438         for _, builds in data.iteritems():
1439             for _, item in builds.iteritems():
1440                 for ID, item_data in item.iteritems():
1441                     merged_data[ID] = item_data
1442
1443         return merged_data