-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2020 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
"""
return self._specification[u"input"]
+ @input.setter
+ def input(self, new_value):
+ """Setter - specification - inputs.
+
+ :param new_value: New value to be set.
+ :type new_value: dict
+ """
+ self._specification[u"input"] = new_value
+
@property
def builds(self):
"""Getter - builds defined in specification.
"""
return self.input[u"builds"]
+ @builds.setter
+ def builds(self, new_value):
+ """Setter - builds defined in specification.
+
+ :param new_value: New value to be set.
+ :type new_value: dict
+ """
+ self.input[u"builds"] = new_value
+
+ def add_build(self, job, build):
+ """Add a build to the specification.
+
+ :param job: The job which run the build.
+ :param build: The build to be added.
+ :type job: str
+ :type build: dict
+ """
+ if self._specification[u"input"][u"builds"].get(job, None) is None:
+ self._specification[u"input"][u"builds"][job] = list()
+ self._specification[u"input"][u"builds"][job].append(build)
+
@property
def output(self):
"""Getter - specification - output formats and versions to be generated.
generated.
:returns: List of specifications of Continuous Performance Trending and
- Analysis to be generated.
+ Analysis to be generated.
:rtype: list
"""
return self._specification[u"cpta"]
def set_input_state(self, job, build_nr, state):
"""Set the state of input
- :param job:
- :param build_nr:
- :param state:
- :return:
+ :param job: Job name.
+ :param build_nr: Build number.
+ :param state: The new input state.
+ :type job: str
+ :type build_nr: int
+ :type state: str
+ :raises: PresentationError if wrong job and/or build is provided.
"""
try:
def set_input_file_name(self, job, build_nr, file_name):
"""Set the state of input
- :param job:
- :param build_nr:
- :param file_name:
- :return:
+ :param job: Job name.
+ :param build_nr: Build number.
+ :param file_name: The new file name.
+ :type job: str
+ :type build_nr: int
+ :type file_name: str
+ :raises: PresentationError if wrong job and/or build is provided.
"""
try:
- lastCompletedBuild
:type job" str
:raises PresentationError: If it is not possible to get the build
- number.
+ number.
:returns: The build number.
:rtype: int
"""
specification YAML file.
:param item_type: Item type: Top level items in specification YAML file,
- e.g.: environment, input, output.
+ e.g.: environment, input, output.
:type item_type: str
:returns: Index of the given item type.
:rtype: int
:param data: The data where the tags will be replaced by their values.
:param src_data: Data where the tags are defined. It is dictionary where
- the key is the tag and the value is the tag value. If not given, 'data'
- is used instead.
- :type data: str or dict
+ the key is the tag and the value is the tag value. If not given,
+ 'data' is used instead.
+ :type data: str, list or dict
:type src_data: dict
:returns: Data with the tags replaced.
- :rtype: str or dict
+ :rtype: str, list or dict
:raises: PresentationError if it is not possible to replace the tag or
- the data is not the supported data type (str, dict).
+ the data is not the supported data type (str, list or dict).
"""
if src_data is None:
tag = self._find_tag(data)
if tag is not None:
data = data.replace(tag, src_data[tag[1:-1]])
+ return data
+
+ if isinstance(data, list):
+ new_list = list()
+ for item in data:
+ new_list.append(self._replace_tags(item, src_data))
+ return new_list
- elif isinstance(data, dict):
+ if isinstance(data, dict):
counter = 0
for key, value in data.items():
tag = self._find_tag(value)
)
if counter:
self._replace_tags(data, src_data)
- else:
- raise PresentationError(u"Replace tags: Not supported data type.")
+ return data
- return data
+ raise PresentationError(u"Replace tags: Not supported data type.")
def _parse_env(self):
"""Parse environment specification in the specification YAML file.
continue
if isinstance(builds, dict):
build_end = builds.get(u"end", None)
+ max_builds = builds.get(u"max-builds", None)
+ reverse = builds.get(u"reverse", False)
try:
build_end = int(build_end)
except ValueError:
# defined as a range <start, build_type>
build_end = self._get_build_number(job, build_end)
- builds = [x for x in range(builds[u"start"],
- build_end + 1)
- if x not in builds.get(u"skip", list())]
+ builds = list(range(builds[u"start"], build_end + 1))
+ if max_builds and max_builds < len(builds):
+ builds = builds[-max_builds:]
+ if reverse:
+ builds.reverse()
self.configuration[u"data-sets"][set_name][job] = builds
elif isinstance(builds, list):
for idx, item in enumerate(builds):
if builds:
if isinstance(builds, dict):
build_end = builds.get(u"end", None)
+ max_builds = builds.get(u"max-builds", None)
+ reverse = bool(builds.get(u"reverse", False))
try:
build_end = int(build_end)
except ValueError:
# defined as a range <start, build_type>
+ if build_end in (u"lastCompletedBuild",
+ u"lastSuccessfulBuild"):
+ reverse = True
build_end = self._get_build_number(job, build_end)
builds = [x for x in range(builds[u"start"],
build_end + 1)
if x not in builds.get(u"skip", list())]
+ if reverse:
+ builds.reverse()
+ if max_builds and max_builds < len(builds):
+ builds = builds[:max_builds]
self._specification[u"input"][u"builds"][job] = list()
for build in builds:
self._specification[u"input"][u"builds"][job]. \
f"No build is defined for the job {job}. Trying to "
f"continue without it."
)
+
except KeyError:
raise PresentationError(u"No data to process.")
if isinstance(data_set, str):
table[u"history"][i][u"data-replacement"] = \
self.configuration[u"data-sets"][data_set]
+
+ if table.get(u"columns", None):
+ for i in range(len(table[u"columns"])):
+ data_set = table[u"columns"][i].get(u"data-set", None)
+ if isinstance(data_set, str):
+ table[u"columns"][i][u"data-set"] = \
+ self.configuration[u"data-sets"][data_set]
+ data_set = table[u"columns"][i].get(
+ u"data-replacement", None)
+ if isinstance(data_set, str):
+ table[u"columns"][i][u"data-replacement"] = \
+ self.configuration[u"data-sets"][data_set]
+
except KeyError:
raise PresentationError(
f"Wrong data set used in {table.get(u'title', u'')}."
except KeyError:
pass
+ try:
+ element[u"output-file-links"] = self._replace_tags(
+ element[u"output-file-links"],
+ self._specification[u"environment"][u"paths"])
+ except KeyError:
+ pass
+
# Add data sets to the elements:
if isinstance(element.get(u"data", None), str):
data_set = element[u"data"]
f"Data set {data_set} is not defined in the "
f"configuration section."
)
+ elif isinstance(element.get(u"data", None), list):
+ new_list = list()
+ for item in element[u"data"]:
+ try:
+ new_list.append(
+ self.configuration[u"data-sets"][item]
+ )
+ except KeyError:
+ raise PresentationError(
+ f"Data set {item} is not defined in the "
+ f"configuration section."
+ )
+ element[u"data"] = new_list
# Parse elements:
if element[u"type"] == u"table":
"""Parse specification in the specification YAML file.
:raises: PresentationError if an error occurred while parsing the
- specification file.
+ specification file.
"""
try:
self._cfg_yaml = load(self._cfg_file, Loader=FullLoader)
except YAMLError as err:
raise PresentationError(msg=u"An error occurred while parsing the "
u"specification file.",
- details=str(err))
+ details=repr(err))
self._parse_env()
self._parse_configuration()