83f4f6249b9d78fb2a9a6055ebb629d2fd01d790
[csit.git] / resources / tools / presentation / new / utils.py
1 # Copyright (c) 2018 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """General purpose utilities.
15 """
16
17 import multiprocessing
18 import subprocess
19 import numpy as np
20 import pandas as pd
21 import logging
22
23 from os import walk, makedirs, environ
24 from os.path import join, isdir
25 from shutil import move, Error
26 from math import sqrt
27
28 from errors import PresentationError
29 from jumpavg.BitCountingClassifier import BitCountingClassifier
30
31
32 def mean(items):
33     """Calculate mean value from the items.
34
35     :param items: Mean value is calculated from these items.
36     :type items: list
37     :returns: MEan value.
38     :rtype: float
39     """
40
41     return float(sum(items)) / len(items)
42
43
44 def stdev(items):
45     """Calculate stdev from the items.
46
47     :param items: Stdev is calculated from these items.
48     :type items: list
49     :returns: Stdev.
50     :rtype: float
51     """
52
53     avg = mean(items)
54     variance = [(x - avg) ** 2 for x in items]
55     stddev = sqrt(mean(variance))
56     return stddev
57
58
59 def relative_change(nr1, nr2):
60     """Compute relative change of two values.
61
62     :param nr1: The first number.
63     :param nr2: The second number.
64     :type nr1: float
65     :type nr2: float
66     :returns: Relative change of nr1.
67     :rtype: float
68     """
69
70     return float(((nr2 - nr1) / nr1) * 100)
71
72
73 def get_files(path, extension=None, full_path=True):
74     """Generates the list of files to process.
75
76     :param path: Path to files.
77     :param extension: Extension of files to process. If it is the empty string,
78         all files will be processed.
79     :param full_path: If True, the files with full path are generated.
80     :type path: str
81     :type extension: str
82     :type full_path: bool
83     :returns: List of files to process.
84     :rtype: list
85     """
86
87     file_list = list()
88     for root, _, files in walk(path):
89         for filename in files:
90             if extension:
91                 if filename.endswith(extension):
92                     if full_path:
93                         file_list.append(join(root, filename))
94                     else:
95                         file_list.append(filename)
96             else:
97                 file_list.append(join(root, filename))
98
99     return file_list
100
101
102 def get_rst_title_char(level):
103     """Return character used for the given title level in rst files.
104
105     :param level: Level of the title.
106     :type: int
107     :returns: Character used for the given title level in rst files.
108     :rtype: str
109     """
110     chars = ('=', '-', '`', "'", '.', '~', '*', '+', '^')
111     if level < len(chars):
112         return chars[level]
113     else:
114         return chars[-1]
115
116
117 def execute_command(cmd):
118     """Execute the command in a subprocess and log the stdout and stderr.
119
120     :param cmd: Command to execute.
121     :type cmd: str
122     :returns: Return code of the executed command.
123     :rtype: int
124     """
125
126     env = environ.copy()
127     proc = subprocess.Popen(
128         [cmd],
129         stdout=subprocess.PIPE,
130         stderr=subprocess.PIPE,
131         shell=True,
132         env=env)
133
134     stdout, stderr = proc.communicate()
135
136     if stdout:
137         logging.info(stdout)
138     if stderr:
139         logging.info(stderr)
140
141     if proc.returncode != 0:
142         logging.error("    Command execution failed.")
143     return proc.returncode, stdout, stderr
144
145
146 def get_last_successful_build_number(jenkins_url, job_name):
147     """Get the number of the last successful build of the given job.
148
149     :param jenkins_url: Jenkins URL.
150     :param job_name: Job name.
151     :type jenkins_url: str
152     :type job_name: str
153     :returns: The build number as a string.
154     :rtype: str
155     """
156
157     url = "{}/{}/lastSuccessfulBuild/buildNumber".format(jenkins_url, job_name)
158     cmd = "wget -qO- {url}".format(url=url)
159
160     return execute_command(cmd)
161
162
163 def get_last_completed_build_number(jenkins_url, job_name):
164     """Get the number of the last completed build of the given job.
165
166     :param jenkins_url: Jenkins URL.
167     :param job_name: Job name.
168     :type jenkins_url: str
169     :type job_name: str
170     :returns: The build number as a string.
171     :rtype: str
172     """
173
174     url = "{}/{}/lastCompletedBuild/buildNumber".format(jenkins_url, job_name)
175     cmd = "wget -qO- {url}".format(url=url)
176
177     return execute_command(cmd)
178
179
180 def archive_input_data(spec):
181     """Archive the report.
182
183     :param spec: Specification read from the specification file.
184     :type spec: Specification
185     :raises PresentationError: If it is not possible to archive the input data.
186     """
187
188     logging.info("    Archiving the input data files ...")
189
190     extension = spec.input["file-format"]
191     data_files = get_files(spec.environment["paths"]["DIR[WORKING,DATA]"],
192                            extension=extension)
193     dst = spec.environment["paths"]["DIR[STATIC,ARCH]"]
194     logging.info("      Destination: {0}".format(dst))
195
196     try:
197         if not isdir(dst):
198             makedirs(dst)
199
200         for data_file in data_files:
201             logging.info("      Moving the file: {0} ...".format(data_file))
202             move(data_file, dst)
203
204     except (Error, OSError) as err:
205         raise PresentationError("Not possible to archive the input data.",
206                                 str(err))
207
208     logging.info("    Done.")
209
210
211 def classify_anomalies(data):
212     """Process the data and return anomalies and trending values.
213
214     Gathers data into groups with common trend value.
215     Decorates first value in the group to be an outlier, regression,
216     normal or progression.
217
218     :param data: Full data set with unavailable samples replaced by nan.
219     :type data: pandas.Series
220     :returns: Classification and trend values
221     :rtype: 2-tuple, list of strings and list of floats
222     """
223     bare_data = [sample for _, sample in data.iteritems()
224                  if not np.isnan(sample)]
225     # TODO: Put analogous iterator into jumpavg library.
226     groups = BitCountingClassifier.classify(bare_data)
227     groups.reverse()  # Just to use .pop() for FIFO.
228     classification = []
229     avgs = []
230     active_group = None
231     values_left = 0
232     avg = 0.0
233     for _, sample in data.iteritems():
234         if np.isnan(sample):
235             classification.append("outlier")
236             avgs.append(sample)
237             continue
238         if values_left < 1 or active_group is None:
239             values_left = 0
240             while values_left < 1:  # To ignore empty groups.
241                 active_group = groups.pop()
242                 values_left = len(active_group.values)
243             avg = active_group.metadata.avg
244             classification.append(active_group.metadata.classification)
245             avgs.append(avg)
246             values_left -= 1
247             continue
248         classification.append("normal")
249         avgs.append(avg)
250         values_left -= 1
251     return classification, avgs
252
253
254 class Worker(multiprocessing.Process):
255     """Worker class used to process tasks in separate parallel processes.
256     """
257
258     def __init__(self, work_queue, data_queue, func):
259         """Initialization.
260
261         :param work_queue: Queue with items to process.
262         :param data_queue: Shared memory between processes. Queue which keeps
263             the result data. This data is then read by the main process and used
264             in further processing.
265         :param func: Function which is executed by the worker.
266         :type work_queue: multiprocessing.JoinableQueue
267         :type data_queue: multiprocessing.Manager().Queue()
268         :type func: Callable object
269         """
270         super(Worker, self).__init__()
271         self._work_queue = work_queue
272         self._data_queue = data_queue
273         self._func = func
274
275     def run(self):
276         """Method representing the process's activity.
277         """
278
279         while True:
280             try:
281                 self.process(self._work_queue.get())
282             finally:
283                 self._work_queue.task_done()
284
285     def process(self, item_to_process):
286         """Method executed by the runner.
287
288         :param item_to_process: Data to be processed by the function.
289         :type item_to_process: tuple
290         """
291         self._func(self.pid, self._data_queue, *item_to_process)