-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
"""
+from os.path import join, exists
+
import logging
-from yaml import load, YAMLError
from pprint import pformat
-from errors import PresentationError
-from utils import (
- get_last_successful_build_number, get_last_completed_build_number)
+from yaml import load, FullLoader, YAMLError
+
+from pal_errors import PresentationError
+from pal_utils import (
+ get_last_successful_build_nr, get_last_completed_build_number
+)
class Specification:
"""Specification of Presentation and analytics layer.
- - based on specification specified in the specification YAML file
+ - based on specification specified in the specification YAML files
- presentation and analytics layer is model driven
"""
# Tags are used in specification YAML file and replaced while the file is
# parsed.
- TAG_OPENER = "{"
- TAG_CLOSER = "}"
+ TAG_OPENER = u"{"
+ TAG_CLOSER = u"}"
- def __init__(self, cfg_file):
+ def __init__(self, cfg_dir):
"""Initialization.
- :param cfg_file: File handler for the specification YAML file.
- :type cfg_file: BinaryIO
+ :param cfg_dir: Directory with the specification files.
+ :type cfg_dir: str
"""
- self._cfg_file = cfg_file
+ self._cfg_dir = cfg_dir
self._cfg_yaml = None
- self._specification = {"environment": dict(),
- "configuration": dict(),
- "static": dict(),
- "input": dict(),
- "output": dict(),
- "tables": list(),
- "plots": list(),
- "files": list(),
- "cpta": dict()}
+ self._specification = {
+ u"environment": dict(),
+ u"data_sets": dict(),
+ u"layouts": dict(),
+ u"static": dict(),
+ u"input": dict(),
+ u"output": dict(),
+ u"tables": list(),
+ u"plots": list(),
+ u"files": list(),
+ u"cpta": dict()
+ }
@property
def specification(self):
:returns: Environment specification.
:rtype: dict
"""
- return self._specification["environment"]
+ return self._specification[u"environment"]
@property
- def configuration(self):
- """Getter - configuration.
+ def data_sets(self):
+ """Getter - data_sets.
- :returns: Configuration of PAL.
+ :returns: Data sets.
:rtype: dict
"""
- return self._specification["configuration"]
+ return self._specification[u"data_sets"]
+
+ @property
+ def layouts(self):
+ """Getter - layouts.
+
+ :returns: Layouts.
+ :rtype: dict
+ """
+ return self._specification[u"layouts"]
@property
def static(self):
:returns: Static content specification.
:rtype: dict
"""
- return self._specification["static"]
+ return self._specification[u"static"]
@property
def mapping(self):
one.
:rtype: dict
"""
- return self._specification["configuration"]["mapping"]
+ return self.environment[u"mapping"]
@property
def ignore(self):
:returns: List of ignored test cases.
:rtype: list
"""
- return self._specification["configuration"]["ignore"]
+ return self.environment[u"ignore"]
@property
def alerting(self):
"""Getter - Alerting.
+ # TODO
+
:returns: Specification of alerts.
:rtype: dict
"""
- return self._specification["configuration"]["alerting"]
+ return self.environment[u"alerting"]
@property
def input(self):
:returns: Inputs.
:rtype: dict
"""
- return self._specification["input"]
+ return self._specification[u"input"]
- @property
- def builds(self):
- """Getter - builds defined in specification.
+ @input.setter
+ def input(self, new_value):
+ """Setter - specification - inputs.
- :returns: Builds defined in the specification.
- :rtype: dict
+ :param new_value: New value to be set.
+ :type new_value: dict
"""
- return self.input["builds"]
+ self._specification[u"input"] = new_value
+
+ def add_build(self, job, build):
+ """Add a build to the list of builds if it does not exist there.
+
+ :param job: The job which run the build.
+ :param build: The build to be added.
+ :type job: str
+ :type build: dict
+ """
+ if self.input.get(job, None) is None:
+ self.input[job] = list()
+ for existing_build in self.input[job]:
+ if existing_build[u"build"] == build[u"build"]:
+ break
+ else:
+ self.input[job].append(build)
@property
def output(self):
:returns: Outputs to be generated.
:rtype: dict
"""
- return self._specification["output"]
+ return self._specification[u"output"]
@property
def tables(self):
:returns: List of specifications of tables to be generated.
:rtype: list
"""
- return self._specification["tables"]
+ return self._specification.get(u"tables", list())
@property
def plots(self):
:returns: List of specifications of plots to be generated.
:rtype: list
"""
- return self._specification["plots"]
+ return self._specification.get(u"plots", list())
@property
def files(self):
:returns: List of specifications of files to be generated.
:rtype: list
"""
- return self._specification["files"]
+ return self._specification.get(u"files", list())
@property
def cpta(self):
generated.
:returns: List of specifications of Continuous Performance Trending and
- Analysis to be generated.
+ Analysis to be generated.
:rtype: list
"""
- return self._specification["cpta"]
+ return self._specification[u"cpta"]
def set_input_state(self, job, build_nr, state):
- """Set the state of input
+ """Set the state of the input.
- :param job:
- :param build_nr:
- :param state:
- :return:
+ :param job: Job name.
+ :param build_nr: Build number.
+ :param state: The new input state.
+ :type job: str
+ :type build_nr: int
+ :type state: str
+ :raises: PresentationError if wrong job and/or build is provided.
"""
try:
- for build in self._specification["input"]["builds"][job]:
- if build["build"] == build_nr:
- build["status"] = state
+ for build in self.input[job]:
+ if build[u"build"] == build_nr:
+ build[u"status"] = state
break
else:
- raise PresentationError("Build '{}' is not defined for job '{}'"
- " in specification file.".
- format(build_nr, job))
+ raise PresentationError(
+ f"Build {build_nr} is not defined for job {job} in "
+ f"specification file."
+ )
except KeyError:
- raise PresentationError("Job '{}' and build '{}' is not defined in "
- "specification file.".format(job, build_nr))
+ raise PresentationError(
+ f"Job {job} and build {build_nr} is not defined in "
+ f"specification file."
+ )
def set_input_file_name(self, job, build_nr, file_name):
- """Set the state of input
+ """Set the file name for the input.
- :param job:
- :param build_nr:
- :param file_name:
- :return:
+ :param job: Job name.
+ :param build_nr: Build number.
+ :param file_name: The new file name.
+ :type job: str
+ :type build_nr: int
+ :type file_name: str
+ :raises: PresentationError if wrong job and/or build is provided.
"""
try:
- for build in self._specification["input"]["builds"][job]:
- if build["build"] == build_nr:
- build["file-name"] = file_name
+ for build in self.input[job]:
+ if build[u"build"] == build_nr:
+ build[u"file-name"] = file_name
break
else:
- raise PresentationError("Build '{}' is not defined for job '{}'"
- " in specification file.".
- format(build_nr, job))
+ raise PresentationError(
+ f"Build {build_nr} is not defined for job {job} in "
+ f"specification file."
+ )
except KeyError:
- raise PresentationError("Job '{}' and build '{}' is not defined in "
- "specification file.".format(job, build_nr))
+ raise PresentationError(
+ f"Job {job} and build {build_nr} is not defined in "
+ f"specification file."
+ )
def _get_build_number(self, job, build_type):
"""Get the number of the job defined by its name:
- lastCompletedBuild
:type job" str
:raises PresentationError: If it is not possible to get the build
- number.
+ number.
:returns: The build number.
:rtype: int
"""
# defined as a range <start, end>
- if build_type == "lastSuccessfulBuild":
+ if build_type == u"lastSuccessfulBuild":
# defined as a range <start, lastSuccessfulBuild>
- ret_code, build_nr, _ = get_last_successful_build_number(
- self.environment["urls"]["URL[JENKINS,CSIT]"], job)
- elif build_type == "lastCompletedBuild":
+ ret_code, build_nr, _ = get_last_successful_build_nr(
+ self.environment[u"urls"][u"URL[JENKINS,CSIT]"], job)
+ elif build_type == u"lastCompletedBuild":
# defined as a range <start, lastCompletedBuild>
ret_code, build_nr, _ = get_last_completed_build_number(
- self.environment["urls"]["URL[JENKINS,CSIT]"], job)
+ self.environment[u"urls"][u"URL[JENKINS,CSIT]"], job)
else:
- raise PresentationError("Not supported build type: '{0}'".
- format(build_type))
+ raise PresentationError(f"Not supported build type: {build_type}")
if ret_code != 0:
- raise PresentationError("Not possible to get the number of the "
- "build number.")
+ raise PresentationError(
+ f"Not possible to get the build number of {job}."
+ )
try:
build_nr = int(build_nr)
return build_nr
except ValueError as err:
- raise PresentationError("Not possible to get the number of the "
- "build number.\nReason: {0}".format(err))
+ raise PresentationError(
+ f"Not possible to get the build number of {job}. Reason:\n"
+ f"{repr(err)}"
+ )
def _get_type_index(self, item_type):
"""Get index of item type (environment, input, output, ...) in
specification YAML file.
:param item_type: Item type: Top level items in specification YAML file,
- e.g.: environment, input, output.
+ e.g.: environment, input, output.
:type item_type: str
:returns: Index of the given item type.
:rtype: int
index = 0
for item in self._cfg_yaml:
- if item["type"] == item_type:
+ if item[u"type"] == item_type:
return index
index += 1
return None
:param data: The data where the tags will be replaced by their values.
:param src_data: Data where the tags are defined. It is dictionary where
- the key is the tag and the value is the tag value. If not given, 'data'
- is used instead.
- :type data: str or dict
+ the key is the tag and the value is the tag value. If not given,
+ 'data' is used instead.
+ :type data: str, list or dict
:type src_data: dict
:returns: Data with the tags replaced.
- :rtype: str or dict
+ :rtype: str, list or dict
:raises: PresentationError if it is not possible to replace the tag or
- the data is not the supported data type (str, dict).
+ the data is not the supported data type (str, list or dict).
"""
if src_data is None:
tag = self._find_tag(data)
if tag is not None:
data = data.replace(tag, src_data[tag[1:-1]])
+ return data
- elif isinstance(data, dict):
+ if isinstance(data, list):
+ new_list = list()
+ for item in data:
+ new_list.append(self._replace_tags(item, src_data))
+ return new_list
+
+ if isinstance(data, dict):
counter = 0
for key, value in data.items():
tag = self._find_tag(value)
data[key] = value.replace(tag, src_data[tag[1:-1]])
counter += 1
except KeyError:
- raise PresentationError("Not possible to replace the "
- "tag '{}'".format(tag))
+ raise PresentationError(
+ f"Not possible to replace the tag {tag}"
+ )
if counter:
self._replace_tags(data, src_data)
- else:
- raise PresentationError("Replace tags: Not supported data type.")
+ return data
- return data
+ raise PresentationError(u"Replace tags: Not supported data type.")
def _parse_env(self):
"""Parse environment specification in the specification YAML file.
"""
- logging.info("Parsing specification file: environment ...")
+ logging.info(u"Parsing specification: ENVIRONMENT")
- idx = self._get_type_index("environment")
+ idx = self._get_type_index(u"environment")
if idx is None:
- return None
+ return
- try:
- self._specification["environment"]["configuration"] = \
- self._cfg_yaml[idx]["configuration"]
- except KeyError:
- self._specification["environment"]["configuration"] = None
+ self._specification[u"environment"][u"spec-files"] = \
+ self._cfg_yaml[idx].get(u"spec-files", None)
try:
- self._specification["environment"]["paths"] = \
- self._replace_tags(self._cfg_yaml[idx]["paths"])
+ self._specification[u"environment"][u"paths"] = \
+ self._replace_tags(self._cfg_yaml[idx][u"paths"])
except KeyError:
- self._specification["environment"]["paths"] = None
+ self._specification[u"environment"][u"paths"] = None
- try:
- self._specification["environment"]["urls"] = \
- self._cfg_yaml[idx]["urls"]
- except KeyError:
- self._specification["environment"]["urls"] = None
+ self._specification[u"environment"][u"data-sources"] = \
+ self._cfg_yaml[idx].get(u"data-sources", tuple())
+ # Add statistics:
+ for source in self._specification[u"environment"][u"data-sources"]:
+ source[u"successful-downloads"] = 0
- try:
- self._specification["environment"]["make-dirs"] = \
- self._cfg_yaml[idx]["make-dirs"]
- except KeyError:
- self._specification["environment"]["make-dirs"] = None
+ self._specification[u"environment"][u"make-dirs"] = \
+ self._cfg_yaml[idx].get(u"make-dirs", None)
- try:
- self._specification["environment"]["remove-dirs"] = \
- self._cfg_yaml[idx]["remove-dirs"]
- except KeyError:
- self._specification["environment"]["remove-dirs"] = None
+ self._specification[u"environment"][u"remove-dirs"] = \
+ self._cfg_yaml[idx].get(u"remove-dirs", None)
- try:
- self._specification["environment"]["build-dirs"] = \
- self._cfg_yaml[idx]["build-dirs"]
- except KeyError:
- self._specification["environment"]["build-dirs"] = None
+ self._specification[u"environment"][u"build-dirs"] = \
+ self._cfg_yaml[idx].get(u"build-dirs", None)
- try:
- self._specification["environment"]["testbeds"] = \
- self._cfg_yaml[idx]["testbeds"]
- except KeyError:
- self._specification["environment"]["testbeds"] = None
+ self._specification[u"environment"][u"testbeds"] = \
+ self._cfg_yaml[idx].get(u"testbeds", None)
+
+ self._specification[u"environment"][u"limits"] = \
+ self._cfg_yaml[idx].get(u"limits", None)
+
+ self._specification[u"environment"][u"urls"] = \
+ self._cfg_yaml[idx].get(u"urls", None)
- logging.info("Done.")
+ self._specification[u"environment"][u"archive-inputs"] = \
+ self._cfg_yaml[idx].get(u"archive-inputs", False)
- def _parse_configuration(self):
- """Parse configuration of PAL in the specification YAML file.
+ self._specification[u"environment"][u"reverse-input"] = \
+ self._cfg_yaml[idx].get(u"reverse-input", False)
+
+ self._specification[u"environment"][u"time-period"] = \
+ self._cfg_yaml[idx].get(u"time-period", None)
+
+ self._specification[u"environment"][u"alerting"] = \
+ self._cfg_yaml[idx].get(u"alerting", None)
+
+ self._specification[u"environment"][u"mapping-file"] = \
+ self._cfg_yaml[idx].get(u"mapping-file", None)
+
+ self._specification[u"environment"][u"ignore-list"] = \
+ self._cfg_yaml[idx].get(u"ignore-list", None)
+
+ # Mapping table:
+ self._load_mapping_table()
+
+ # Ignore list:
+ self._load_ignore_list()
+
+ def _parse_layouts(self):
+ """Parse layouts specification in the specification YAML file.
"""
- logging.info("Parsing specification file: configuration ...")
+ logging.info(u"Parsing specification: LAYOUTS")
- idx = self._get_type_index("configuration")
+ idx = self._get_type_index(u"layouts")
if idx is None:
- logging.warning("No configuration information in the specification "
- "file.")
- return None
+ return
try:
- self._specification["configuration"] = self._cfg_yaml[idx]
+ self._specification[u"layouts"] = self._cfg_yaml[idx]
+ except KeyError:
+ raise PresentationError(u"No layouts defined.")
+
+ def _parse_data_sets(self):
+ """Parse data sets specification in the specification YAML file.
+ """
+
+ logging.info(u"Parsing specification: DATA SETS")
+ idx = self._get_type_index(u"data-sets")
+ if idx is None:
+ return
+
+ try:
+ self._specification[u"data_sets"] = self._cfg_yaml[idx]
except KeyError:
- raise PresentationError("No configuration defined.")
+ raise PresentationError(u"No Data sets defined.")
- # Data sets: Replace ranges by lists
- for set_name, data_set in self.configuration["data-sets"].items():
+ # Replace ranges by lists
+ for set_name, data_set in self.data_sets.items():
if not isinstance(data_set, dict):
continue
for job, builds in data_set.items():
- if builds:
- if isinstance(builds, dict):
- build_end = builds.get("end", None)
+ if not builds:
+ continue
+ if isinstance(builds, dict):
+ build_end = builds.get(u"end", None)
+ max_builds = builds.get(u"max-builds", None)
+ reverse = builds.get(u"reverse", False)
+ try:
+ build_end = int(build_end)
+ except ValueError:
+ # defined as a range <start, build_type>
+ build_end = self._get_build_number(job, build_end)
+ builds = list(range(builds[u"start"], build_end + 1))
+ if max_builds and max_builds < len(builds):
+ builds = builds[-max_builds:]
+ if reverse:
+ builds.reverse()
+ self.data_sets[set_name][job] = builds
+ elif isinstance(builds, list):
+ for idx, item in enumerate(builds):
try:
- build_end = int(build_end)
+ builds[idx] = int(item)
except ValueError:
- # defined as a range <start, build_type>
- build_end = self._get_build_number(job, build_end)
- builds = [x for x in range(builds["start"], build_end+1)
- if x not in builds.get("skip", list())]
- self.configuration["data-sets"][set_name][job] = builds
- elif isinstance(builds, list):
- for idx, item in enumerate(builds):
- try:
- builds[idx] = int(item)
- except ValueError:
- # defined as a range <build_type>
- builds[idx] = self._get_build_number(job, item)
-
- # Data sets: add sub-sets to sets (only one level):
- for set_name, data_set in self.configuration["data-sets"].items():
+ # defined as a range <build_type>
+ builds[idx] = self._get_build_number(job, item)
+
+ # Add sub-sets to sets (only one level):
+ for set_name, data_set in self.data_sets.items():
if isinstance(data_set, list):
new_set = dict()
for item in data_set:
try:
- for key, val in self.configuration["data-sets"][item].\
- items():
+ for key, val in self.data_sets[item].items():
new_set[key] = val
except KeyError:
raise PresentationError(
- "Data set {0} is not defined in "
- "the configuration section.".format(item))
- self.configuration["data-sets"][set_name] = new_set
+ f"Data set {item} is not defined."
+ )
+ self.data_sets[set_name] = new_set
- # Mapping table:
- mapping = None
- mapping_file_name = self._specification["configuration"].\
- get("mapping-file", None)
+ def _load_mapping_table(self):
+ """Load a mapping table if it is specified. If not, use empty dict.
+ """
+
+ mapping_file_name = self.environment.get(u"mapping-file", None)
if mapping_file_name:
- logging.debug("Mapping file: '{0}'".format(mapping_file_name))
try:
- with open(mapping_file_name, 'r') as mfile:
- mapping = load(mfile)
- logging.debug("Loaded mapping table:\n{0}".format(mapping))
+ with open(mapping_file_name, u'r') as mfile:
+ mapping = load(mfile, Loader=FullLoader)
+ # Make sure everything is lowercase
+ self.environment[u"mapping"] = \
+ {key.lower(): val.lower() for key, val in
+ mapping.items()}
+ logging.debug(f"Loaded mapping table:\n{mapping}")
except (YAMLError, IOError) as err:
raise PresentationError(
- msg="An error occurred while parsing the mapping file "
- "'{0}'.".format(mapping_file_name),
- details=repr(err))
- # Make sure everything is lowercase
- if mapping:
- self._specification["configuration"]["mapping"] = \
- {key.lower(): val.lower() for key, val in mapping.iteritems()}
+ msg=f"An error occurred while parsing the mapping file "
+ f"{mapping_file_name}",
+ details=repr(err)
+ )
else:
- self._specification["configuration"]["mapping"] = dict()
+ self.environment[u"mapping"] = dict()
- # Ignore list:
- ignore = None
- ignore_list_name = self._specification["configuration"].\
- get("ignore-list", None)
+ def _load_ignore_list(self):
+ """Load an ignore list if it is specified. If not, use empty list.
+ """
+
+ ignore_list_name = self.environment.get(u"ignore-list", None)
if ignore_list_name:
- logging.debug("Ignore list file: '{0}'".format(ignore_list_name))
try:
- with open(ignore_list_name, 'r') as ifile:
- ignore = load(ifile)
- logging.debug("Loaded ignore list:\n{0}".format(ignore))
+ with open(ignore_list_name, u'r') as ifile:
+ ignore = load(ifile, Loader=FullLoader)
+ # Make sure everything is lowercase
+ self.environment[u"ignore"] = \
+ [item.lower() for item in ignore]
+ logging.debug(f"Loaded ignore list:\n{ignore}")
except (YAMLError, IOError) as err:
raise PresentationError(
- msg="An error occurred while parsing the ignore list file "
- "'{0}'.".format(ignore_list_name),
- details=repr(err))
- # Make sure everything is lowercase
- if ignore:
- self._specification["configuration"]["ignore"] = \
- [item.lower() for item in ignore]
+ msg=f"An error occurred while parsing the ignore list file "
+ f"{ignore_list_name}.",
+ details=repr(err)
+ )
else:
- self._specification["configuration"]["ignore"] = list()
-
- logging.info("Done.")
-
- def _parse_input(self):
- """Parse input specification in the specification YAML file.
-
- :raises: PresentationError if there are no data to process.
- """
-
- logging.info("Parsing specification file: input ...")
-
- idx = self._get_type_index("input")
- if idx is None:
- raise PresentationError("No data to process.")
-
- try:
- for key, value in self._cfg_yaml[idx]["general"].items():
- self._specification["input"][key] = value
- self._specification["input"]["builds"] = dict()
-
- for job, builds in self._cfg_yaml[idx]["builds"].items():
- if builds:
- if isinstance(builds, dict):
- build_end = builds.get("end", None)
- try:
- build_end = int(build_end)
- except ValueError:
- # defined as a range <start, build_type>
- build_end = self._get_build_number(job, build_end)
- builds = [x for x in range(builds["start"], build_end+1)
- if x not in builds.get("skip", list())]
- self._specification["input"]["builds"][job] = list()
- for build in builds:
- self._specification["input"]["builds"][job]. \
- append({"build": build, "status": None})
-
- else:
- logging.warning("No build is defined for the job '{}'. "
- "Trying to continue without it.".
- format(job))
- except KeyError:
- raise PresentationError("No data to process.")
-
- logging.info("Done.")
+ self.environment[u"ignore"] = list()
def _parse_output(self):
"""Parse output specification in the specification YAML file.
:raises: PresentationError if there is no output defined.
"""
- logging.info("Parsing specification file: output ...")
+ logging.info(u"Parsing specification: OUTPUT")
- idx = self._get_type_index("output")
+ idx = self._get_type_index(u"output")
if idx is None:
- raise PresentationError("No output defined.")
+ raise PresentationError(u"No output defined.")
try:
- self._specification["output"] = self._cfg_yaml[idx]
+ self._specification[u"output"] = self._cfg_yaml[idx]
except (KeyError, IndexError):
- raise PresentationError("No output defined.")
-
- logging.info("Done.")
+ raise PresentationError(u"No output defined.")
def _parse_static(self):
"""Parse specification of the static content in the specification YAML
file.
"""
- logging.info("Parsing specification file: static content ...")
+ logging.info(u"Parsing specification: STATIC CONTENT")
- idx = self._get_type_index("static")
+ idx = self._get_type_index(u"static")
if idx is None:
- logging.warning("No static content specified.")
+ logging.warning(u"No static content specified.")
+ self._specification[u"static"] = dict()
+ return
for key, value in self._cfg_yaml[idx].items():
if isinstance(value, str):
try:
self._cfg_yaml[idx][key] = self._replace_tags(
- value, self._specification["environment"]["paths"])
+ value, self._specification[u"environment"][u"paths"])
except KeyError:
pass
- self._specification["static"] = self._cfg_yaml[idx]
+ self._specification[u"static"] = self._cfg_yaml[idx]
+
+ def _parse_elements_tables(self, table):
+ """Parse tables from the specification YAML file.
+
+ :param table: Table to be parsed from the specification file.
+ :type table: dict
+ :raises PresentationError: If wrong data set is used.
+ """
+
+ try:
+ table[u"template"] = self._replace_tags(
+ table[u"template"],
+ self._specification[u"environment"][u"paths"])
+ except KeyError:
+ pass
+
+ # Add data sets
+ try:
+ for item in (u"reference", u"compare"):
+ if table.get(item, None):
+ data_set = table[item].get(u"data", None)
+ if isinstance(data_set, str):
+ table[item][u"data"] = self.data_sets[data_set]
+ data_set = table[item].get(u"data-replacement", None)
+ if isinstance(data_set, str):
+ table[item][u"data-replacement"] = \
+ self.data_sets[data_set]
+
+ if table.get(u"columns", None):
+ for i in range(len(table[u"columns"])):
+ data_set = table[u"columns"][i].get(u"data-set", None)
+ if isinstance(data_set, str):
+ table[u"columns"][i][u"data-set"] = \
+ self.data_sets[data_set]
+ data_set = table[u"columns"][i].get(
+ u"data-replacement", None)
+ if isinstance(data_set, str):
+ table[u"columns"][i][u"data-replacement"] = \
+ self.data_sets[data_set]
+
+ except KeyError:
+ raise PresentationError(
+ f"Wrong data set used in {table.get(u'title', u'')}."
+ )
+
+ self._specification[u"tables"].append(table)
+
+ def _parse_elements_plots(self, plot):
+ """Parse plots from the specification YAML file.
+
+ :param plot: Plot to be parsed from the specification file.
+ :type plot: dict
+ :raises PresentationError: If plot layout is not defined.
+ """
+
+ # Add layout to the plots:
+ layout = plot[u"layout"].get(u"layout", None)
+ if layout is not None:
+ plot[u"layout"].pop(u"layout")
+ try:
+ for key, val in self.layouts[layout].items():
+ plot[u"layout"][key] = val
+ except KeyError:
+ raise PresentationError(f"Layout {layout} is not defined.")
+ self._specification[u"plots"].append(plot)
+
+ def _parse_elements_files(self, file):
+ """Parse files from the specification YAML file.
+
+ :param file: File to be parsed from the specification file.
+ :type file: dict
+ """
+
+ try:
+ file[u"dir-tables"] = self._replace_tags(
+ file[u"dir-tables"],
+ self._specification[u"environment"][u"paths"])
+ except KeyError:
+ pass
+ self._specification[u"files"].append(file)
+
+ def _parse_elements_cpta(self, cpta):
+ """Parse cpta from the specification YAML file.
- logging.info("Done.")
+ :param cpta: cpta to be parsed from the specification file.
+ :type cpta: dict
+ :raises PresentationError: If wrong data set is used or if plot layout
+ is not defined.
+ """
+
+ for plot in cpta[u"plots"]:
+ # Add layout to the plots:
+ layout = plot.get(u"layout", None)
+ if layout is not None:
+ try:
+ plot[u"layout"] = self.layouts[layout]
+ except KeyError:
+ raise PresentationError(f"Layout {layout} is not defined.")
+ # Add data sets:
+ if isinstance(plot.get(u"data", None), str):
+ data_set = plot[u"data"]
+ try:
+ plot[u"data"] = self.data_sets[data_set]
+ except KeyError:
+ raise PresentationError(
+ f"Data set {data_set} is not defined."
+ )
+ self._specification[u"cpta"] = cpta
def _parse_elements(self):
- """Parse elements (tables, plots) specification in the specification
+ """Parse elements (tables, plots, ..) specification in the specification
YAML file.
"""
- logging.info("Parsing specification file: elements ...")
+ logging.info(u"Parsing specification: ELEMENTS")
count = 1
for element in self._cfg_yaml:
+
+ # Replace tags:
try:
- element["output-file"] = self._replace_tags(
- element["output-file"],
- self._specification["environment"]["paths"])
+ element[u"output-file"] = self._replace_tags(
+ element[u"output-file"],
+ self.environment[u"paths"]
+ )
except KeyError:
pass
try:
- element["input-file"] = self._replace_tags(
- element["input-file"],
- self._specification["environment"]["paths"])
+ element[u"input-file"] = self._replace_tags(
+ element[u"input-file"],
+ self.environment[u"paths"]
+ )
except KeyError:
pass
- # add data sets to the elements:
- if isinstance(element.get("data", None), str):
- data_set = element["data"]
- try:
- element["data"] = self.configuration["data-sets"][data_set]
- except KeyError:
- raise PresentationError("Data set {0} is not defined in "
- "the configuration section.".
- format(data_set))
-
- if element["type"] == "table":
- logging.info(" {:3d} Processing a table ...".format(count))
- try:
- element["template"] = self._replace_tags(
- element["template"],
- self._specification["environment"]["paths"])
- except KeyError:
- pass
+ try:
+ element[u"output-file-links"] = self._replace_tags(
+ element[u"output-file-links"],
+ self.environment[u"paths"]
+ )
+ except KeyError:
+ pass
- # add data sets
+ # Add data sets to the elements:
+ if isinstance(element.get(u"data", None), str):
+ data_set = element[u"data"]
try:
- for item in ("reference", "compare"):
- if element.get(item, None):
- data_set = element[item].get("data", None)
- if isinstance(data_set, str):
- element[item]["data"] = \
- self.configuration["data-sets"][data_set]
- data_set = element[item].get("data-replacement",
- None)
- if isinstance(data_set, str):
- element[item]["data-replacement"] = \
- self.configuration["data-sets"][data_set]
-
- if element.get("history", None):
- for i in range(len(element["history"])):
- data_set = element["history"][i].get("data", None)
- if isinstance(data_set, str):
- element["history"][i]["data"] = \
- self.configuration["data-sets"][data_set]
- data_set = element["history"][i].get(
- "data-replacement", None)
- if isinstance(data_set, str):
- element["history"][i]["data-replacement"] = \
- self.configuration["data-sets"][data_set]
-
+ element[u"data"] = self.data_sets[data_set]
except KeyError:
- raise PresentationError("Wrong data set used in {0}.".
- format(element.get("title", "")))
-
- self._specification["tables"].append(element)
- count += 1
-
- elif element["type"] == "plot":
- logging.info(" {:3d} Processing a plot ...".format(count))
-
- # Add layout to the plots:
- layout = element["layout"].get("layout", None)
- if layout is not None:
- element["layout"].pop("layout")
+ raise PresentationError(
+ f"Data set {data_set} is not defined."
+ )
+ elif isinstance(element.get(u"data", None), list):
+ new_list = list()
+ for item in element[u"data"]:
try:
- for key, val in (self.configuration["plot-layouts"]
- [layout].items()):
- element["layout"][key] = val
+ new_list.append(self.data_sets[item])
except KeyError:
- raise PresentationError("Layout {0} is not defined in "
- "the configuration section.".
- format(layout))
- self._specification["plots"].append(element)
+ raise PresentationError(
+ f"Data set {item} is not defined."
+ )
+ element[u"data"] = new_list
+
+ # Parse elements:
+ if element[u"type"] == u"table":
+ logging.info(f" {count:3d} Processing a table ...")
+ self._parse_elements_tables(element)
count += 1
-
- elif element["type"] == "file":
- logging.info(" {:3d} Processing a file ...".format(count))
- try:
- element["dir-tables"] = self._replace_tags(
- element["dir-tables"],
- self._specification["environment"]["paths"])
- except KeyError:
- pass
- self._specification["files"].append(element)
+ elif element[u"type"] == u"plot":
+ logging.info(f" {count:3d} Processing a plot ...")
+ self._parse_elements_plots(element)
+ count += 1
+ elif element[u"type"] == u"file":
+ logging.info(f" {count:3d} Processing a file ...")
+ self._parse_elements_files(element)
+ count += 1
+ elif element[u"type"] == u"cpta":
+ logging.info(
+ f" {count:3d} Processing Continuous Performance Trending "
+ f"and Analysis ..."
+ )
+ self._parse_elements_cpta(element)
count += 1
- elif element["type"] == "cpta":
- logging.info(" {:3d} Processing Continuous Performance "
- "Trending and Analysis ...".format(count))
+ def _prepare_input(self):
+ """Use information from data sets and generate list of jobs and builds
+ to download.
+ """
- for plot in element["plots"]:
- # Add layout to the plots:
- layout = plot.get("layout", None)
- if layout is not None:
- try:
- plot["layout"] = \
- self.configuration["plot-layouts"][layout]
- except KeyError:
- raise PresentationError(
- "Layout {0} is not defined in the "
- "configuration section.".format(layout))
- # Add data sets:
- if isinstance(plot.get("data", None), str):
- data_set = plot["data"]
- try:
- plot["data"] = \
- self.configuration["data-sets"][data_set]
- except KeyError:
- raise PresentationError(
- "Data set {0} is not defined in "
- "the configuration section.".
- format(data_set))
- self._specification["cpta"] = element
- count += 1
+ logging.info(u"Parsing specification: INPUT")
- logging.info("Done.")
+ idx = self._get_type_index(u"input")
+ if idx is None:
+ logging.info(u"Creating the list of inputs from data sets.")
+ for data_set in self.data_sets.values():
+ if data_set == "data-sets":
+ continue
+ for job, builds in data_set.items():
+ for build in builds:
+ self.add_build(
+ job,
+ {
+ u"build": build,
+ u"status": None,
+ u"file-name": None,
+ u"source": None
+ }
+ )
+ else:
+ logging.info(u"Reading pre-defined inputs.")
+ for job, builds in self._cfg_yaml[idx][u"builds"].items():
+ for build in builds:
+ self.add_build(
+ job,
+ {
+ u"build": build,
+ u"status": None,
+ u"file-name": None,
+ u"source": None
+ }
+ )
+
+ if self.environment[u"reverse-input"]:
+ for builds in self.input.values():
+ builds.sort(key=lambda k: k[u"build"], reverse=True)
def read_specification(self):
- """Parse specification in the specification YAML file.
+ """Parse specification in the specification YAML files.
:raises: PresentationError if an error occurred while parsing the
- specification file.
+ specification file.
"""
- try:
- self._cfg_yaml = load(self._cfg_file)
- except YAMLError as err:
- raise PresentationError(msg="An error occurred while parsing the "
- "specification file.",
- details=str(err))
+
+ # It always starts with environment.yaml file, it must be present.
+ spec_file = join(self._cfg_dir, u"environment.yaml")
+ logging.info(f"Reading {spec_file}")
+ if not exists(spec_file):
+ raise PresentationError(f"The file {spec_file} does not exist.")
+
+ with open(spec_file, u"r") as file_read:
+ try:
+ self._cfg_yaml = load(file_read, Loader=FullLoader)
+ except YAMLError as err:
+ raise PresentationError(
+ f"An error occurred while parsing the specification file "
+ f"{spec_file}",
+ details=repr(err)
+ )
+
+ # Load the other specification files specified in the environment.yaml
+ idx = self._get_type_index(u"environment")
+ if idx is None:
+ raise PresentationError(
+ f"No environment defined in the file {spec_file}"
+ )
+ for spec_file in self._cfg_yaml[idx].get(u"spec-files", tuple()):
+ logging.info(f"Reading {spec_file}")
+ if not exists(spec_file):
+ raise PresentationError(f"The file {spec_file} does not exist.")
+ spec = None
+ with open(spec_file, u"r") as file_read:
+ try:
+ spec = load(file_read, Loader=FullLoader)
+ except YAMLError as err:
+ raise PresentationError(
+ f"An error occurred while parsing the specification "
+ f"file {spec_file}",
+ details=repr(err)
+ )
+ if spec:
+ self._cfg_yaml.extend(spec)
self._parse_env()
- self._parse_configuration()
- self._parse_input()
+ self._parse_layouts()
+ self._parse_data_sets()
self._parse_output()
self._parse_static()
self._parse_elements()
+ self._prepare_input()
- logging.debug("Specification: \n{}".
- format(pformat(self._specification)))
+ logging.debug(f"Specification: \n{pformat(self.specification)}")