a4e24663b50f17b1a7ed7f9947878dba1098b9e2
[csit.git] / resources / tools / presentation / new / utils.py
1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """General purpose utilities.
15 """
16
17 import multiprocessing
18 import subprocess
19 import numpy as np
20 import pandas as pd
21 import logging
22
23 from os import walk, makedirs, environ
24 from os.path import join, isdir
25 from shutil import move, Error
26 from math import sqrt
27
28 from errors import PresentationError
29 from jumpavg.BitCountingClassifier import BitCountingClassifier
30
31
32 def mean(items):
33     """Calculate mean value from the items.
34
35     :param items: Mean value is calculated from these items.
36     :type items: list
37     :returns: MEan value.
38     :rtype: float
39     """
40
41     return float(sum(items)) / len(items)
42
43
44 def stdev(items):
45     """Calculate stdev from the items.
46
47     :param items: Stdev is calculated from these items.
48     :type items: list
49     :returns: Stdev.
50     :rtype: float
51     """
52
53     avg = mean(items)
54     variance = [(x - avg) ** 2 for x in items]
55     stddev = sqrt(mean(variance))
56     return stddev
57
58
59 def relative_change(nr1, nr2):
60     """Compute relative change of two values.
61
62     :param nr1: The first number.
63     :param nr2: The second number.
64     :type nr1: float
65     :type nr2: float
66     :returns: Relative change of nr1.
67     :rtype: float
68     """
69
70     return float(((nr2 - nr1) / nr1) * 100)
71
72
73 def get_files(path, extension=None, full_path=True):
74     """Generates the list of files to process.
75
76     :param path: Path to files.
77     :param extension: Extension of files to process. If it is the empty string,
78         all files will be processed.
79     :param full_path: If True, the files with full path are generated.
80     :type path: str
81     :type extension: str
82     :type full_path: bool
83     :returns: List of files to process.
84     :rtype: list
85     """
86
87     file_list = list()
88     for root, _, files in walk(path):
89         for filename in files:
90             if extension:
91                 if filename.endswith(extension):
92                     if full_path:
93                         file_list.append(join(root, filename))
94                     else:
95                         file_list.append(filename)
96             else:
97                 file_list.append(join(root, filename))
98
99     return file_list
100
101
102 def get_rst_title_char(level):
103     """Return character used for the given title level in rst files.
104
105     :param level: Level of the title.
106     :type: int
107     :returns: Character used for the given title level in rst files.
108     :rtype: str
109     """
110     chars = ('=', '-', '`', "'", '.', '~', '*', '+', '^')
111     if level < len(chars):
112         return chars[level]
113     else:
114         return chars[-1]
115
116
117 def execute_command(cmd):
118     """Execute the command in a subprocess and log the stdout and stderr.
119
120     :param cmd: Command to execute.
121     :type cmd: str
122     :returns: Return code of the executed command.
123     :rtype: int
124     """
125
126     env = environ.copy()
127     proc = subprocess.Popen(
128         [cmd],
129         stdout=subprocess.PIPE,
130         stderr=subprocess.PIPE,
131         shell=True,
132         env=env)
133
134     stdout, stderr = proc.communicate()
135
136     if stdout:
137         logging.info(stdout)
138     if stderr:
139         logging.info(stderr)
140
141     if proc.returncode != 0:
142         logging.error("    Command execution failed.")
143     return proc.returncode, stdout, stderr
144
145
146 def get_last_successful_build_number(jenkins_url, job_name):
147     """Get the number of the last successful build of the given job.
148
149     :param jenkins_url: Jenkins URL.
150     :param job_name: Job name.
151     :type jenkins_url: str
152     :type job_name: str
153     :returns: The build number as a string.
154     :rtype: str
155     """
156
157     url = "{}/{}/lastSuccessfulBuild/buildNumber".format(jenkins_url, job_name)
158     cmd = "wget -qO- {url}".format(url=url)
159
160     return execute_command(cmd)
161
162
163 def get_last_completed_build_number(jenkins_url, job_name):
164     """Get the number of the last completed build of the given job.
165
166     :param jenkins_url: Jenkins URL.
167     :param job_name: Job name.
168     :type jenkins_url: str
169     :type job_name: str
170     :returns: The build number as a string.
171     :rtype: str
172     """
173
174     url = "{}/{}/lastCompletedBuild/buildNumber".format(jenkins_url, job_name)
175     cmd = "wget -qO- {url}".format(url=url)
176
177     return execute_command(cmd)
178
179
180 def archive_input_data(spec):
181     """Archive the report.
182
183     :param spec: Specification read from the specification file.
184     :type spec: Specification
185     :raises PresentationError: If it is not possible to archive the input data.
186     """
187
188     logging.info("    Archiving the input data files ...")
189
190     extension = spec.input["file-format"]
191     data_files = get_files(spec.environment["paths"]["DIR[WORKING,DATA]"],
192                            extension=extension)
193     dst = spec.environment["paths"]["DIR[STATIC,ARCH]"]
194     logging.info("      Destination: {0}".format(dst))
195
196     try:
197         if not isdir(dst):
198             makedirs(dst)
199
200         for data_file in data_files:
201             logging.info("      Moving the file: {0} ...".format(data_file))
202             move(data_file, dst)
203
204     except (Error, OSError) as err:
205         raise PresentationError("Not possible to archive the input data.",
206                                 str(err))
207
208     logging.info("    Done.")
209
210
211 def classify_anomalies(data):
212     """Process the data and return anomalies and trending values.
213
214     Gather data into groups with average as trend value.
215     Decorate values within groups to be normal,
216     the first value of changed average as a regression, or a progression.
217
218     :param data: Full data set with unavailable samples replaced by nan.
219     :type data: pandas.Series
220     :returns: Classification and trend values
221     :rtype: 2-tuple, list of strings and list of floats
222     """
223     # Nan mean something went wrong.
224     # Use 0.0 to cause that being reported as a severe regression.
225     bare_data = [0.0 if np.isnan(sample) else sample
226                  for _, sample in data.iteritems()]
227     # TODO: Put analogous iterator into jumpavg library.
228     groups = BitCountingClassifier().classify(bare_data)
229     groups.reverse()  # Just to use .pop() for FIFO.
230     classification = []
231     avgs = []
232     active_group = None
233     values_left = 0
234     avg = 0.0
235     for _, sample in data.iteritems():
236         if np.isnan(sample):
237             classification.append("outlier")
238             avgs.append(sample)
239             continue
240         if values_left < 1 or active_group is None:
241             values_left = 0
242             while values_left < 1:  # Ignore empty groups (should not happen).
243                 active_group = groups.pop()
244                 values_left = len(active_group.values)
245             avg = active_group.metadata.avg
246             classification.append(active_group.metadata.classification)
247             avgs.append(avg)
248             values_left -= 1
249             continue
250         classification.append("normal")
251         avgs.append(avg)
252         values_left -= 1
253     return classification, avgs
254
255
256 class Worker(multiprocessing.Process):
257     """Worker class used to process tasks in separate parallel processes.
258     """
259
260     def __init__(self, work_queue, data_queue, func):
261         """Initialization.
262
263         :param work_queue: Queue with items to process.
264         :param data_queue: Shared memory between processes. Queue which keeps
265             the result data. This data is then read by the main process and used
266             in further processing.
267         :param func: Function which is executed by the worker.
268         :type work_queue: multiprocessing.JoinableQueue
269         :type data_queue: multiprocessing.Manager().Queue()
270         :type func: Callable object
271         """
272         super(Worker, self).__init__()
273         self._work_queue = work_queue
274         self._data_queue = data_queue
275         self._func = func
276
277     def run(self):
278         """Method representing the process's activity.
279         """
280
281         while True:
282             try:
283                 self.process(self._work_queue.get())
284             finally:
285                 self._work_queue.task_done()
286
287     def process(self, item_to_process):
288         """Method executed by the runner.
289
290         :param item_to_process: Data to be processed by the function.
291         :type item_to_process: tuple
292         """
293         self._func(self.pid, self._data_queue, *item_to_process)