51bb1d0305ecf6111d10feea95a86de3312bd6db
[csit.git] / resources / tools / presentation / utils.py
1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """General purpose utilities.
15 """
16
17 import multiprocessing
18 import subprocess
19 import numpy as np
20 import logging
21 import csv
22 import prettytable
23
24 from os import walk, makedirs, environ
25 from os.path import join, isdir
26 from shutil import move, Error
27 from math import sqrt
28
29 from errors import PresentationError
30 from jumpavg.BitCountingClassifier import BitCountingClassifier
31
32
33 def mean(items):
34     """Calculate mean value from the items.
35
36     :param items: Mean value is calculated from these items.
37     :type items: list
38     :returns: MEan value.
39     :rtype: float
40     """
41
42     return float(sum(items)) / len(items)
43
44
45 def stdev(items):
46     """Calculate stdev from the items.
47
48     :param items: Stdev is calculated from these items.
49     :type items: list
50     :returns: Stdev.
51     :rtype: float
52     """
53
54     avg = mean(items)
55     variance = [(x - avg) ** 2 for x in items]
56     stddev = sqrt(mean(variance))
57     return stddev
58
59
60 def relative_change(nr1, nr2):
61     """Compute relative change of two values.
62
63     :param nr1: The first number.
64     :param nr2: The second number.
65     :type nr1: float
66     :type nr2: float
67     :returns: Relative change of nr1.
68     :rtype: float
69     """
70
71     return float(((nr2 - nr1) / nr1) * 100)
72
73
74 def get_files(path, extension=None, full_path=True):
75     """Generates the list of files to process.
76
77     :param path: Path to files.
78     :param extension: Extension of files to process. If it is the empty string,
79         all files will be processed.
80     :param full_path: If True, the files with full path are generated.
81     :type path: str
82     :type extension: str
83     :type full_path: bool
84     :returns: List of files to process.
85     :rtype: list
86     """
87
88     file_list = list()
89     for root, _, files in walk(path):
90         for filename in files:
91             if extension:
92                 if filename.endswith(extension):
93                     if full_path:
94                         file_list.append(join(root, filename))
95                     else:
96                         file_list.append(filename)
97             else:
98                 file_list.append(join(root, filename))
99
100     return file_list
101
102
103 def get_rst_title_char(level):
104     """Return character used for the given title level in rst files.
105
106     :param level: Level of the title.
107     :type: int
108     :returns: Character used for the given title level in rst files.
109     :rtype: str
110     """
111     chars = ('=', '-', '`', "'", '.', '~', '*', '+', '^')
112     if level < len(chars):
113         return chars[level]
114     else:
115         return chars[-1]
116
117
118 def execute_command(cmd):
119     """Execute the command in a subprocess and log the stdout and stderr.
120
121     :param cmd: Command to execute.
122     :type cmd: str
123     :returns: Return code of the executed command, stdout and stderr.
124     :rtype: tuple(int, str, str)
125     """
126
127     env = environ.copy()
128     proc = subprocess.Popen(
129         [cmd],
130         stdout=subprocess.PIPE,
131         stderr=subprocess.PIPE,
132         shell=True,
133         env=env)
134
135     stdout, stderr = proc.communicate()
136
137     if stdout:
138         logging.info(stdout)
139     if stderr:
140         logging.info(stderr)
141
142     if proc.returncode != 0:
143         logging.error("    Command execution failed.")
144     return proc.returncode, stdout, stderr
145
146
147 def get_last_successful_build_number(jenkins_url, job_name):
148     """Get the number of the last successful build of the given job.
149
150     :param jenkins_url: Jenkins URL.
151     :param job_name: Job name.
152     :type jenkins_url: str
153     :type job_name: str
154     :returns: The build number as a string.
155     :rtype: str
156     """
157
158     url = "{}/{}/lastSuccessfulBuild/buildNumber".format(jenkins_url, job_name)
159     cmd = "wget -qO- {url}".format(url=url)
160
161     return execute_command(cmd)
162
163
164 def get_last_completed_build_number(jenkins_url, job_name):
165     """Get the number of the last completed build of the given job.
166
167     :param jenkins_url: Jenkins URL.
168     :param job_name: Job name.
169     :type jenkins_url: str
170     :type job_name: str
171     :returns: The build number as a string.
172     :rtype: str
173     """
174
175     url = "{}/{}/lastCompletedBuild/buildNumber".format(jenkins_url, job_name)
176     cmd = "wget -qO- {url}".format(url=url)
177
178     return execute_command(cmd)
179
180
181 def archive_input_data(spec):
182     """Archive the report.
183
184     :param spec: Specification read from the specification file.
185     :type spec: Specification
186     :raises PresentationError: If it is not possible to archive the input data.
187     """
188
189     logging.info("    Archiving the input data files ...")
190
191     extension = spec.input["file-format"]
192     data_files = get_files(spec.environment["paths"]["DIR[WORKING,DATA]"],
193                            extension=extension)
194     dst = spec.environment["paths"]["DIR[STATIC,ARCH]"]
195     logging.info("      Destination: {0}".format(dst))
196
197     try:
198         if not isdir(dst):
199             makedirs(dst)
200
201         for data_file in data_files:
202             logging.info("      Moving the file: {0} ...".format(data_file))
203             move(data_file, dst)
204
205     except (Error, OSError) as err:
206         raise PresentationError("Not possible to archive the input data.",
207                                 str(err))
208
209     logging.info("    Done.")
210
211
212 def classify_anomalies(data):
213     """Process the data and return anomalies and trending values.
214
215     Gather data into groups with average as trend value.
216     Decorate values within groups to be normal,
217     the first value of changed average as a regression, or a progression.
218
219     :param data: Full data set with unavailable samples replaced by nan.
220     :type data: OrderedDict
221     :returns: Classification and trend values
222     :rtype: 2-tuple, list of strings and list of floats
223     """
224     # Nan mean something went wrong.
225     # Use 0.0 to cause that being reported as a severe regression.
226     bare_data = [0.0 if np.isnan(sample.avg) else sample
227                  for _, sample in data.iteritems()]
228     # TODO: Put analogous iterator into jumpavg library.
229     groups = BitCountingClassifier().classify(bare_data)
230     groups.reverse()  # Just to use .pop() for FIFO.
231     classification = []
232     avgs = []
233     active_group = None
234     values_left = 0
235     avg = 0.0
236     for _, sample in data.iteritems():
237         if np.isnan(sample.avg):
238             classification.append("outlier")
239             avgs.append(sample.avg)
240             continue
241         if values_left < 1 or active_group is None:
242             values_left = 0
243             while values_left < 1:  # Ignore empty groups (should not happen).
244                 active_group = groups.pop()
245                 values_left = len(active_group.values)
246             avg = active_group.metadata.avg
247             classification.append(active_group.metadata.classification)
248             avgs.append(avg)
249             values_left -= 1
250             continue
251         classification.append("normal")
252         avgs.append(avg)
253         values_left -= 1
254     return classification, avgs
255
256
257 def convert_csv_to_pretty_txt(csv_file, txt_file):
258     """Convert the given csv table to pretty text table.
259
260     :param csv_file: The path to the input csv file.
261     :param txt_file: The path to the output pretty text file.
262     :type csv_file: str
263     :type txt_file: str
264     """
265
266     txt_table = None
267     with open(csv_file, 'rb') as csv_file:
268         csv_content = csv.reader(csv_file, delimiter=',', quotechar='"')
269         for row in csv_content:
270             if txt_table is None:
271                 txt_table = prettytable.PrettyTable(row)
272             else:
273                 txt_table.add_row(row)
274         txt_table.align["Test case"] = "l"
275     if txt_table:
276         with open(txt_file, "w") as txt_file:
277             txt_file.write(str(txt_table))
278
279
280 class Worker(multiprocessing.Process):
281     """Worker class used to process tasks in separate parallel processes.
282     """
283
284     def __init__(self, work_queue, data_queue, func):
285         """Initialization.
286
287         :param work_queue: Queue with items to process.
288         :param data_queue: Shared memory between processes. Queue which keeps
289             the result data. This data is then read by the main process and used
290             in further processing.
291         :param func: Function which is executed by the worker.
292         :type work_queue: multiprocessing.JoinableQueue
293         :type data_queue: multiprocessing.Manager().Queue()
294         :type func: Callable object
295         """
296         super(Worker, self).__init__()
297         self._work_queue = work_queue
298         self._data_queue = data_queue
299         self._func = func
300
301     def run(self):
302         """Method representing the process's activity.
303         """
304
305         while True:
306             try:
307                 self.process(self._work_queue.get())
308             finally:
309                 self._work_queue.task_done()
310
311     def process(self, item_to_process):
312         """Method executed by the runner.
313
314         :param item_to_process: Data to be processed by the function.
315         :type item_to_process: tuple
316         """
317         self._func(self.pid, self._data_queue, *item_to_process)