1 # Copyright (c) 2017 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """General purpose utilities.
22 from os import walk, makedirs, environ
23 from os.path import join, isdir
24 from shutil import copy, Error
27 from errors import PresentationError
31 """Calculate mean value from the items.
33 :param items: Mean value is calculated from these items.
39 return float(sum(items)) / len(items)
43 """Calculate stdev from the items.
45 :param items: Stdev is calculated from these items.
52 variance = [(x - avg) ** 2 for x in items]
53 stddev = sqrt(mean(variance))
57 def relative_change(nr1, nr2):
58 """Compute relative change of two values.
60 :param nr1: The first number.
61 :param nr2: The second number.
64 :returns: Relative change of nr1.
68 return float(((nr2 - nr1) / nr1) * 100)
71 def remove_outliers(input_list, outlier_const=1.5, window=14):
72 """Return list with outliers removed, using split_outliers.
74 :param input_list: Data from which the outliers will be removed.
75 :param outlier_const: Outlier constant.
76 :param window: How many preceding values to take into account.
77 :type input_list: list of floats
78 :type outlier_const: float
80 :returns: The input list without outliers.
81 :rtype: list of floats
84 data = np.array(input_list)
85 upper_quartile = np.percentile(data, 75)
86 lower_quartile = np.percentile(data, 25)
87 iqr = (upper_quartile - lower_quartile) * outlier_const
88 quartile_set = (lower_quartile - iqr, upper_quartile + iqr)
90 for y in data.tolist():
91 if quartile_set[0] <= y <= quartile_set[1]:
95 # input_series = pd.Series()
96 # for index, value in enumerate(input_list):
97 # item_pd = pd.Series([value, ], index=[index, ])
98 # input_series.append(item_pd)
99 # output_series, _ = split_outliers(input_series, outlier_const=outlier_const,
101 # output_list = [y for x, y in output_series.items() if not np.isnan(y)]
106 def split_outliers(input_series, outlier_const=1.5, window=14):
107 """Go through the input data and generate two pandas series:
108 - input data with outliers replaced by NAN
110 The function uses IQR to detect outliers.
112 :param input_series: Data to be examined for outliers.
113 :param outlier_const: Outlier constant.
114 :param window: How many preceding values to take into account.
115 :type input_series: pandas.Series
116 :type outlier_const: float
118 :returns: Input data with NAN outliers and Outliers.
119 :rtype: (pandas.Series, pandas.Series)
122 list_data = list(input_series.items())
123 head_size = min(window, len(list_data))
124 head_list = list_data[:head_size]
125 trimmed_data = pd.Series()
126 outliers = pd.Series()
127 for item_x, item_y in head_list:
128 item_pd = pd.Series([item_y, ], index=[item_x, ])
129 trimmed_data = trimmed_data.append(item_pd)
130 for index, (item_x, item_y) in list(enumerate(list_data))[head_size:]:
131 y_rolling_list = [y for (x, y) in list_data[index - head_size:index]]
132 y_rolling_array = np.array(y_rolling_list)
133 q1 = np.percentile(y_rolling_array, 25)
134 q3 = np.percentile(y_rolling_array, 75)
135 iqr = (q3 - q1) * outlier_const
136 low, high = q1 - iqr, q3 + iqr
137 item_pd = pd.Series([item_y, ], index=[item_x, ])
138 if low <= item_y <= high:
139 trimmed_data = trimmed_data.append(item_pd)
141 outliers = outliers.append(item_pd)
142 nan_pd = pd.Series([np.nan, ], index=[item_x, ])
143 trimmed_data = trimmed_data.append(nan_pd)
145 return trimmed_data, outliers
148 def get_files(path, extension=None, full_path=True):
149 """Generates the list of files to process.
151 :param path: Path to files.
152 :param extension: Extension of files to process. If it is the empty string,
153 all files will be processed.
154 :param full_path: If True, the files with full path are generated.
157 :type full_path: bool
158 :returns: List of files to process.
163 for root, _, files in walk(path):
164 for filename in files:
166 if filename.endswith(extension):
168 file_list.append(join(root, filename))
170 file_list.append(filename)
172 file_list.append(join(root, filename))
177 def get_rst_title_char(level):
178 """Return character used for the given title level in rst files.
180 :param level: Level of the title.
182 :returns: Character used for the given title level in rst files.
185 chars = ('=', '-', '`', "'", '.', '~', '*', '+', '^')
186 if level < len(chars):
192 def execute_command(cmd):
193 """Execute the command in a subprocess and log the stdout and stderr.
195 :param cmd: Command to execute.
197 :returns: Return code of the executed command.
202 proc = subprocess.Popen(
204 stdout=subprocess.PIPE,
205 stderr=subprocess.PIPE,
209 stdout, stderr = proc.communicate()
214 if proc.returncode != 0:
215 logging.error(" Command execution failed.")
216 return proc.returncode, stdout, stderr
219 def get_last_successful_build_number(jenkins_url, job_name):
220 """Get the number of the last successful build of the given job.
222 :param jenkins_url: Jenkins URL.
223 :param job_name: Job name.
224 :type jenkins_url: str
226 :returns: The build number as a string.
230 url = "{}/{}/lastSuccessfulBuild/buildNumber".format(jenkins_url, job_name)
231 cmd = "wget -qO- {url}".format(url=url)
233 return execute_command(cmd)
236 def get_last_completed_build_number(jenkins_url, job_name):
237 """Get the number of the last completed build of the given job.
239 :param jenkins_url: Jenkins URL.
240 :param job_name: Job name.
241 :type jenkins_url: str
243 :returns: The build number as a string.
247 url = "{}/{}/lastCompletedBuild/buildNumber".format(jenkins_url, job_name)
248 cmd = "wget -qO- {url}".format(url=url)
250 return execute_command(cmd)
253 def archive_input_data(spec):
254 """Archive the report.
256 :param spec: Specification read from the specification file.
257 :type spec: Specification
258 :raises PresentationError: If it is not possible to archive the input data.
261 logging.info(" Archiving the input data files ...")
264 extension = spec.debug["input-format"]
266 extension = spec.input["file-format"]
267 data_files = get_files(spec.environment["paths"]["DIR[WORKING,DATA]"],
269 dst = spec.environment["paths"]["DIR[STATIC,ARCH]"]
270 logging.info(" Destination: {0}".format(dst))
276 for data_file in data_files:
277 logging.info(" Copying the file: {0} ...".format(data_file))
280 except (Error, OSError) as err:
281 raise PresentationError("Not possible to archive the input data.",
284 logging.info(" Done.")