0a9d985a88743fc5ab8af1b8484e991b8d61b061
[csit.git] / resources / tools / presentation / utils.py
1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """General purpose utilities.
15 """
16
17 import multiprocessing
18 import subprocess
19 import numpy as np
20 import pandas as pd
21 import logging
22
23 from os import walk, makedirs, environ
24 from os.path import join, isdir
25 from shutil import move, Error
26 from math import sqrt
27
28 from errors import PresentationError
29
30
31 def mean(items):
32     """Calculate mean value from the items.
33
34     :param items: Mean value is calculated from these items.
35     :type items: list
36     :returns: MEan value.
37     :rtype: float
38     """
39
40     return float(sum(items)) / len(items)
41
42
43 def stdev(items):
44     """Calculate stdev from the items.
45
46     :param items: Stdev is calculated from these items.
47     :type items: list
48     :returns: Stdev.
49     :rtype: float
50     """
51
52     avg = mean(items)
53     variance = [(x - avg) ** 2 for x in items]
54     stddev = sqrt(mean(variance))
55     return stddev
56
57
58 def relative_change(nr1, nr2):
59     """Compute relative change of two values.
60
61     :param nr1: The first number.
62     :param nr2: The second number.
63     :type nr1: float
64     :type nr2: float
65     :returns: Relative change of nr1.
66     :rtype: float
67     """
68
69     return float(((nr2 - nr1) / nr1) * 100)
70
71
72 def remove_outliers(input_list, outlier_const=1.5, window=14):
73     """Return list with outliers removed, using split_outliers.
74
75     :param input_list: Data from which the outliers will be removed.
76     :param outlier_const: Outlier constant.
77     :param window: How many preceding values to take into account.
78     :type input_list: list of floats
79     :type outlier_const: float
80     :type window: int
81     :returns: The input list without outliers.
82     :rtype: list of floats
83     """
84
85     data = np.array(input_list)
86     upper_quartile = np.percentile(data, 75)
87     lower_quartile = np.percentile(data, 25)
88     iqr = (upper_quartile - lower_quartile) * outlier_const
89     quartile_set = (lower_quartile - iqr, upper_quartile + iqr)
90     result_lst = list()
91     for y in input_list:
92         if quartile_set[0] <= y <= quartile_set[1]:
93             result_lst.append(y)
94     return result_lst
95
96
97 def split_outliers(input_series, outlier_const=1.5, window=14):
98     """Go through the input data and generate two pandas series:
99     - input data with outliers replaced by NAN
100     - outliers.
101     The function uses IQR to detect outliers.
102
103     :param input_series: Data to be examined for outliers.
104     :param outlier_const: Outlier constant.
105     :param window: How many preceding values to take into account.
106     :type input_series: pandas.Series
107     :type outlier_const: float
108     :type window: int
109     :returns: Input data with NAN outliers and Outliers.
110     :rtype: (pandas.Series, pandas.Series)
111     """
112
113     list_data = list(input_series.items())
114     head_size = min(window, len(list_data))
115     head_list = list_data[:head_size]
116     trimmed_data = pd.Series()
117     outliers = pd.Series()
118     for item_x, item_y in head_list:
119         item_pd = pd.Series([item_y, ], index=[item_x, ])
120         trimmed_data = trimmed_data.append(item_pd)
121     for index, (item_x, item_y) in list(enumerate(list_data))[head_size:]:
122         y_rolling_list = [y for (x, y) in list_data[index - head_size:index]]
123         y_rolling_array = np.array(y_rolling_list)
124         q1 = np.percentile(y_rolling_array, 25)
125         q3 = np.percentile(y_rolling_array, 75)
126         iqr = (q3 - q1) * outlier_const
127         low = q1 - iqr
128         item_pd = pd.Series([item_y, ], index=[item_x, ])
129         if low <= item_y:
130             trimmed_data = trimmed_data.append(item_pd)
131         else:
132             outliers = outliers.append(item_pd)
133             nan_pd = pd.Series([np.nan, ], index=[item_x, ])
134             trimmed_data = trimmed_data.append(nan_pd)
135
136     return trimmed_data, outliers
137
138
139 def get_files(path, extension=None, full_path=True):
140     """Generates the list of files to process.
141
142     :param path: Path to files.
143     :param extension: Extension of files to process. If it is the empty string,
144         all files will be processed.
145     :param full_path: If True, the files with full path are generated.
146     :type path: str
147     :type extension: str
148     :type full_path: bool
149     :returns: List of files to process.
150     :rtype: list
151     """
152
153     file_list = list()
154     for root, _, files in walk(path):
155         for filename in files:
156             if extension:
157                 if filename.endswith(extension):
158                     if full_path:
159                         file_list.append(join(root, filename))
160                     else:
161                         file_list.append(filename)
162             else:
163                 file_list.append(join(root, filename))
164
165     return file_list
166
167
168 def get_rst_title_char(level):
169     """Return character used for the given title level in rst files.
170
171     :param level: Level of the title.
172     :type: int
173     :returns: Character used for the given title level in rst files.
174     :rtype: str
175     """
176     chars = ('=', '-', '`', "'", '.', '~', '*', '+', '^')
177     if level < len(chars):
178         return chars[level]
179     else:
180         return chars[-1]
181
182
183 def execute_command(cmd):
184     """Execute the command in a subprocess and log the stdout and stderr.
185
186     :param cmd: Command to execute.
187     :type cmd: str
188     :returns: Return code of the executed command.
189     :rtype: int
190     """
191
192     env = environ.copy()
193     proc = subprocess.Popen(
194         [cmd],
195         stdout=subprocess.PIPE,
196         stderr=subprocess.PIPE,
197         shell=True,
198         env=env)
199
200     stdout, stderr = proc.communicate()
201
202     if stdout:
203         logging.info(stdout)
204     if stderr:
205         logging.info(stderr)
206
207     if proc.returncode != 0:
208         logging.error("    Command execution failed.")
209     return proc.returncode, stdout, stderr
210
211
212 def get_last_successful_build_number(jenkins_url, job_name):
213     """Get the number of the last successful build of the given job.
214
215     :param jenkins_url: Jenkins URL.
216     :param job_name: Job name.
217     :type jenkins_url: str
218     :type job_name: str
219     :returns: The build number as a string.
220     :rtype: str
221     """
222
223     url = "{}/{}/lastSuccessfulBuild/buildNumber".format(jenkins_url, job_name)
224     cmd = "wget -qO- {url}".format(url=url)
225
226     return execute_command(cmd)
227
228
229 def get_last_completed_build_number(jenkins_url, job_name):
230     """Get the number of the last completed build of the given job.
231
232     :param jenkins_url: Jenkins URL.
233     :param job_name: Job name.
234     :type jenkins_url: str
235     :type job_name: str
236     :returns: The build number as a string.
237     :rtype: str
238     """
239
240     url = "{}/{}/lastCompletedBuild/buildNumber".format(jenkins_url, job_name)
241     cmd = "wget -qO- {url}".format(url=url)
242
243     return execute_command(cmd)
244
245
246 def archive_input_data(spec):
247     """Archive the report.
248
249     :param spec: Specification read from the specification file.
250     :type spec: Specification
251     :raises PresentationError: If it is not possible to archive the input data.
252     """
253
254     logging.info("    Archiving the input data files ...")
255
256     extension = spec.input["file-format"]
257     data_files = get_files(spec.environment["paths"]["DIR[WORKING,DATA]"],
258                            extension=extension)
259     dst = spec.environment["paths"]["DIR[STATIC,ARCH]"]
260     logging.info("      Destination: {0}".format(dst))
261
262     try:
263         if not isdir(dst):
264             makedirs(dst)
265
266         for data_file in data_files:
267             logging.info("      Moving the file: {0} ...".format(data_file))
268             move(data_file, dst)
269
270     except (Error, OSError) as err:
271         raise PresentationError("Not possible to archive the input data.",
272                                 str(err))
273
274     logging.info("    Done.")
275
276
277 def classify_anomalies(data, window):
278     """Evaluates if the sample value is an outlier, regression, normal or
279     progression compared to the previous data within the window.
280     We use the intervals defined as:
281     - regress: less than trimmed moving median - 3 * stdev
282     - normal: between trimmed moving median - 3 * stdev and median + 3 * stdev
283     - progress: more than trimmed moving median + 3 * stdev
284     where stdev is trimmed moving standard deviation.
285
286     :param data: Full data set with the outliers replaced by nan.
287     :param window: Window size used to calculate moving average and moving
288         stdev.
289     :type data: pandas.Series
290     :type window: int
291     :returns: Evaluated results.
292     :rtype: list
293     """
294
295     if data.size < 3:
296         return None
297
298     win_size = data.size if data.size < window else window
299     tmm = data.rolling(window=win_size, min_periods=2).median()
300     tmstd = data.rolling(window=win_size, min_periods=2).std()
301
302     classification = ["normal", ]
303     first = True
304     for build, value in data.iteritems():
305         if first:
306             first = False
307             continue
308         if np.isnan(value) or np.isnan(tmm[build]) or np.isnan(tmstd[build]):
309             classification.append("outlier")
310         elif value < (tmm[build] - 3 * tmstd[build]):
311             classification.append("regression")
312         elif value > (tmm[build] + 3 * tmstd[build]):
313             classification.append("progression")
314         else:
315             classification.append("normal")
316     return classification
317
318
319 class Worker(multiprocessing.Process):
320     """Worker class used to process tasks in separate parallel processes.
321     """
322
323     def __init__(self, work_queue, data_queue, func):
324         """Initialization.
325
326         :param work_queue: Queue with items to process.
327         :param data_queue: Shared memory between processes. Queue which keeps
328             the result data. This data is then read by the main process and used
329             in further processing.
330         :param func: Function which is executed by the worker.
331         :type work_queue: multiprocessing.JoinableQueue
332         :type data_queue: multiprocessing.Manager().Queue()
333         :type func: Callable object
334         """
335         super(Worker, self).__init__()
336         self._work_queue = work_queue
337         self._data_queue = data_queue
338         self._func = func
339
340     def run(self):
341         """Method representing the process's activity.
342         """
343
344         while True:
345             try:
346                 self.process(self._work_queue.get())
347             finally:
348                 self._work_queue.task_done()
349
350     def process(self, item_to_process):
351         """Method executed by the runner.
352
353         :param item_to_process: Data to be processed by the function.
354         :type item_to_process: tuple
355         """
356         self._func(self.pid, self._data_queue, *item_to_process)