"""General purpose utilities.
"""
-import multiprocessing
import subprocess
import math
-import numpy as np
import logging
import csv
-import prettytable
from os import walk, makedirs, environ
from os.path import join, isdir
from shutil import move, Error
from datetime import datetime
+
+import numpy as np
+import prettytable
+
from pandas import Series
from resources.libraries.python import jumpavg
-from errors import PresentationError
+
+from pal_errors import PresentationError
def mean(items):
:returns: Character used for the given title level in rst files.
:rtype: str
"""
- chars = ('=', '-', '`', "'", '.', '~', '*', '+', '^')
+ chars = (u'=', u'-', u'`', u"'", u'.', u'~', u'*', u'+', u'^')
if level < len(chars):
return chars[level]
- else:
- return chars[-1]
+ return chars[-1]
def execute_command(cmd):
stdout, stderr = proc.communicate()
if stdout:
- logging.info(stdout)
+ logging.info(stdout.decode())
if stderr:
- logging.info(stderr)
+ logging.info(stderr.decode())
if proc.returncode != 0:
- logging.error(" Command execution failed.")
- return proc.returncode, stdout, stderr
+ logging.error(u" Command execution failed.")
+ return proc.returncode, stdout.decode(), stderr.decode()
-def get_last_successful_build_number(jenkins_url, job_name):
+def get_last_successful_build_nr(jenkins_url, job_name):
"""Get the number of the last successful build of the given job.
:param jenkins_url: Jenkins URL.
:returns: The build number as a string.
:rtype: str
"""
-
- url = "{}/{}/lastSuccessfulBuild/buildNumber".format(jenkins_url, job_name)
- cmd = "wget -qO- {url}".format(url=url)
-
- return execute_command(cmd)
+ return execute_command(
+ f"wget -qO- {jenkins_url}/{job_name}/lastSuccessfulBuild/buildNumber"
+ )
def get_last_completed_build_number(jenkins_url, job_name):
:returns: The build number as a string.
:rtype: str
"""
-
- url = "{}/{}/lastCompletedBuild/buildNumber".format(jenkins_url, job_name)
- cmd = "wget -qO- {url}".format(url=url)
-
- return execute_command(cmd)
+ return execute_command(
+ f"wget -qO- {jenkins_url}/{job_name}/lastCompletedBuild/buildNumber"
+ )
def get_build_timestamp(jenkins_url, job_name, build_nr):
:returns: The timestamp.
:rtype: datetime.datetime
"""
-
- url = "{jenkins_url}/{job_name}/{build_nr}".format(jenkins_url=jenkins_url,
- job_name=job_name,
- build_nr=build_nr)
- cmd = "wget -qO- {url}".format(url=url)
-
- timestamp = execute_command(cmd)
-
+ timestamp = execute_command(
+ f"wget -qO- {jenkins_url}/{job_name}/{build_nr}"
+ )
return datetime.fromtimestamp(timestamp/1000)
:raises PresentationError: If it is not possible to archive the input data.
"""
- logging.info(" Archiving the input data files ...")
+ logging.info(u" Archiving the input data files ...")
- extension = spec.input["arch-file-format"]
+ extension = spec.input[u"arch-file-format"]
data_files = list()
for ext in extension:
data_files.extend(get_files(
- spec.environment["paths"]["DIR[WORKING,DATA]"], extension=ext))
- dst = spec.environment["paths"]["DIR[STATIC,ARCH]"]
- logging.info(" Destination: {0}".format(dst))
+ spec.environment[u"paths"][u"DIR[WORKING,DATA]"], extension=ext))
+ dst = spec.environment[u"paths"][u"DIR[STATIC,ARCH]"]
+ logging.info(f" Destination: {dst}")
try:
if not isdir(dst):
makedirs(dst)
for data_file in data_files:
- logging.info(" Moving the file: {0} ...".format(data_file))
+ logging.info(f" Moving the file: {data_file} ...")
move(data_file, dst)
except (Error, OSError) as err:
- raise PresentationError("Not possible to archive the input data.",
- str(err))
+ raise PresentationError(
+ u"Not possible to archive the input data.",
+ repr(err)
+ )
- logging.info(" Done.")
+ logging.info(u" Done.")
def classify_anomalies(data):
# Nan means something went wrong.
# Use 0.0 to cause that being reported as a severe regression.
bare_data = [0.0 if np.isnan(sample) else sample
- for sample in data.itervalues()]
+ for sample in data.values()]
# TODO: Make BitCountingGroupList a subclass of list again?
group_list = jumpavg.classify(bare_data).group_list
group_list.reverse() # Just to use .pop() for FIFO.
active_group = None
values_left = 0
avg = 0.0
- for sample in data.itervalues():
+ for sample in data.values():
if np.isnan(sample):
- classification.append("outlier")
+ classification.append(u"outlier")
avgs.append(sample)
continue
if values_left < 1 or active_group is None:
avgs.append(avg)
values_left -= 1
continue
- classification.append("normal")
+ classification.append(u"normal")
avgs.append(avg)
values_left -= 1
return classification, avgs
-def convert_csv_to_pretty_txt(csv_file, txt_file):
+def convert_csv_to_pretty_txt(csv_file_name, txt_file_name):
"""Convert the given csv table to pretty text table.
- :param csv_file: The path to the input csv file.
- :param txt_file: The path to the output pretty text file.
- :type csv_file: str
- :type txt_file: str
+ :param csv_file_name: The path to the input csv file.
+ :param txt_file_name: The path to the output pretty text file.
+ :type csv_file_name: str
+ :type txt_file_name: str
"""
txt_table = None
- with open(csv_file, 'rb') as csv_file:
- csv_content = csv.reader(csv_file, delimiter=',', quotechar='"')
+ with open(csv_file_name, u"rt") as csv_file:
+ csv_content = csv.reader(csv_file, delimiter=u',', quotechar=u'"')
for row in csv_content:
if txt_table is None:
txt_table = prettytable.PrettyTable(row)
else:
txt_table.add_row(row)
- txt_table.align["Test case"] = "l"
+ txt_table.align[u"Test case"] = u"l"
if txt_table:
- with open(txt_file, "w") as txt_file:
+ with open(txt_file_name, u"w") as txt_file:
txt_file.write(str(txt_table))
-
-
-class Worker(multiprocessing.Process):
- """Worker class used to process tasks in separate parallel processes.
- """
-
- def __init__(self, work_queue, data_queue, func):
- """Initialization.
-
- :param work_queue: Queue with items to process.
- :param data_queue: Shared memory between processes. Queue which keeps
- the result data. This data is then read by the main process and used
- in further processing.
- :param func: Function which is executed by the worker.
- :type work_queue: multiprocessing.JoinableQueue
- :type data_queue: multiprocessing.Manager().Queue()
- :type func: Callable object
- """
- super(Worker, self).__init__()
- self._work_queue = work_queue
- self._data_queue = data_queue
- self._func = func
-
- def run(self):
- """Method representing the process's activity.
- """
-
- while True:
- try:
- self.process(self._work_queue.get())
- finally:
- self._work_queue.task_done()
-
- def process(self, item_to_process):
- """Method executed by the runner.
-
- :param item_to_process: Data to be processed by the function.
- :type item_to_process: tuple
- """
- self._func(self.pid, self._data_queue, *item_to_process)