+
+
+def execute_command(cmd):
+ """Execute the command in a subprocess and log the stdout and stderr.
+
+ :param cmd: Command to execute.
+ :type cmd: str
+ :returns: Return code of the executed command.
+ :rtype: int
+ """
+
+ env = environ.copy()
+ proc = subprocess.Popen(
+ [cmd],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ shell=True,
+ env=env)
+
+ stdout, stderr = proc.communicate()
+
+ if stdout:
+ logging.info(stdout)
+ if stderr:
+ logging.info(stderr)
+
+ if proc.returncode != 0:
+ logging.error(" Command execution failed.")
+ return proc.returncode, stdout, stderr
+
+
+def get_last_successful_build_number(jenkins_url, job_name):
+ """Get the number of the last successful build of the given job.
+
+ :param jenkins_url: Jenkins URL.
+ :param job_name: Job name.
+ :type jenkins_url: str
+ :type job_name: str
+ :returns: The build number as a string.
+ :rtype: str
+ """
+
+ url = "{}/{}/lastSuccessfulBuild/buildNumber".format(jenkins_url, job_name)
+ cmd = "wget -qO- {url}".format(url=url)
+
+ return execute_command(cmd)
+
+
+def get_last_completed_build_number(jenkins_url, job_name):
+ """Get the number of the last completed build of the given job.
+
+ :param jenkins_url: Jenkins URL.
+ :param job_name: Job name.
+ :type jenkins_url: str
+ :type job_name: str
+ :returns: The build number as a string.
+ :rtype: str
+ """
+
+ url = "{}/{}/lastCompletedBuild/buildNumber".format(jenkins_url, job_name)
+ cmd = "wget -qO- {url}".format(url=url)
+
+ return execute_command(cmd)
+
+
+def archive_input_data(spec):
+ """Archive the report.
+
+ :param spec: Specification read from the specification file.
+ :type spec: Specification
+ :raises PresentationError: If it is not possible to archive the input data.
+ """
+
+ logging.info(" Archiving the input data files ...")
+
+ extension = spec.input["file-format"]
+ data_files = get_files(spec.environment["paths"]["DIR[WORKING,DATA]"],
+ extension=extension)
+ dst = spec.environment["paths"]["DIR[STATIC,ARCH]"]
+ logging.info(" Destination: {0}".format(dst))
+
+ try:
+ if not isdir(dst):
+ makedirs(dst)
+
+ for data_file in data_files:
+ logging.info(" Moving the file: {0} ...".format(data_file))
+ move(data_file, dst)
+
+ except (Error, OSError) as err:
+ raise PresentationError("Not possible to archive the input data.",
+ str(err))
+
+ logging.info(" Done.")
+
+
+class Worker(multiprocessing.Process):
+ """Worker class used to process tasks in separate parallel processes.
+ """
+
+ def __init__(self, work_queue, data_queue, func):
+ """Initialization.
+
+ :param work_queue: Queue with items to process.
+ :param data_queue: Shared memory between processes. Queue which keeps
+ the result data. This data is then read by the main process and used
+ in further processing.
+ :param func: Function which is executed by the worker.
+ :type work_queue: multiprocessing.JoinableQueue
+ :type data_queue: multiprocessing.Manager().Queue()
+ :type func: Callable object
+ """
+ super(Worker, self).__init__()
+ self._work_queue = work_queue
+ self._data_queue = data_queue
+ self._func = func
+
+ def run(self):
+ """Method representing the process's activity.
+ """
+
+ while True:
+ try:
+ self.process(self._work_queue.get())
+ finally:
+ self._work_queue.task_done()
+
+ def process(self, item_to_process):
+ """Method executed by the runner.
+
+ :param item_to_process: Data to be processed by the function.
+ :type item_to_process: tuple
+ """
+ self._func(self.pid, self._data_queue, *item_to_process)