-# Copyright (c) 2019 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# See the License for the specific language governing permissions and
# limitations under the License.
-"""CSIT Presentation and analytics layer.
+"""CSIT Presentation and Analytics Layer.
"""
import sys
import argparse
import logging
-from errors import PresentationError
-from environment import Environment, clean_environment
+from pal_errors import PresentationError
from specification_parser import Specification
+from environment import Environment, clean_environment
+from static_content import prepare_static_content
from input_data_parser import InputData
from generator_tables import generate_tables
from generator_plots import generate_plots
from generator_files import generate_files
-from static_content import prepare_static_content
from generator_report import generate_report
-from generator_CPTA import generate_cpta
+from generator_cpta import generate_cpta
from generator_alerts import Alerting, AlertingError
+from convert_xml_json import convert_xml_to_json
+
+
+OUTPUTS = (u"none", u"report", u"trending", u"convert-xml-to-json")
def parse_args():
:rtype: ArgumentParser
"""
- parser = argparse.ArgumentParser(description=__doc__,
- formatter_class=argparse.
- RawDescriptionHelpFormatter)
- parser.add_argument("-s", "--specification",
- required=True,
- type=argparse.FileType('r'),
- help="Specification YAML file.")
- parser.add_argument("-r", "--release",
- default="master",
- type=str,
- help="Release string of the product.")
- parser.add_argument("-w", "--week",
- default="1",
- type=str,
- help="Calendar week when the report is published.")
- parser.add_argument("-l", "--logging",
- choices=["DEBUG", "INFO", "WARNING",
- "ERROR", "CRITICAL"],
- default="ERROR",
- help="Logging level.")
- parser.add_argument("-f", "--force",
- action='store_true',
- help="Force removing the old build(s) if present.")
+ parser = argparse.ArgumentParser(
+ description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter
+ )
+ parser.add_argument(
+ u"-s", u"--specification",
+ required=True,
+ type=str,
+ help=u"Specification YAML file."
+ )
+ parser.add_argument(
+ u"-r", u"--release",
+ default=u"master",
+ type=str,
+ help=u"Release string of the product."
+ )
+ parser.add_argument(
+ u"-w", u"--week",
+ default=u"1",
+ type=str,
+ help=u"Calendar week when the report is published."
+ )
+ parser.add_argument(
+ u"-l", u"--logging",
+ choices=[
+ u"NOTSET", u"DEBUG", u"INFO", u"WARNING", u"ERROR", u"CRITICAL"
+ ],
+ default=u"ERROR",
+ help=u"Logging level."
+ )
+ parser.add_argument(
+ u"-f", u"--force",
+ action=u"store_true",
+ help=u"Force removing the old build(s) if present."
+ )
+ parser.add_argument(
+ u"-o", u"--print-all-oper-data",
+ action=u"store_true",
+ help=u"Print all operational data to console. Be careful, the output "
+ u"can be really long."
+ )
+ parser.add_argument(
+ u"-i", u"--input-file",
+ type=str,
+ default=u"",
+ help=u"XML file generated by RobotFramework which will be processed "
+ u"instead of downloading the data from Nexus and/or Jenkins. In "
+ u"this case, the section 'input' in the specification file is "
+ u"ignored."
+ )
+ parser.add_argument(
+ u"-d", u"--input-directory",
+ type=str,
+ default=u"",
+ help=u"Directory with XML file(s) generated by RobotFramework or with "
+ u"sub-directories with XML file(s) which will be processed "
+ u"instead of downloading the data from Nexus and/or Jenkins. In "
+ u"this case, the section 'input' in the specification file is "
+ u"ignored."
+ )
return parser.parse_args()
def main():
"""Main function."""
- log_levels = {"NOTSET": logging.NOTSET,
- "DEBUG": logging.DEBUG,
- "INFO": logging.INFO,
- "WARNING": logging.WARNING,
- "ERROR": logging.ERROR,
- "CRITICAL": logging.CRITICAL}
+ log_levels = {
+ u"NOTSET": logging.NOTSET,
+ u"DEBUG": logging.DEBUG,
+ u"INFO": logging.INFO,
+ u"WARNING": logging.WARNING,
+ u"ERROR": logging.ERROR,
+ u"CRITICAL": logging.CRITICAL
+ }
args = parse_args()
- logging.basicConfig(format='%(asctime)s: %(levelname)s: %(message)s',
- datefmt='%Y/%m/%d %H:%M:%S',
- level=log_levels[args.logging])
+ logging.basicConfig(
+ format=u"%(asctime)s: %(levelname)s: %(message)s",
+ datefmt=u"%Y/%m/%d %H:%M:%S",
+ level=log_levels[args.logging]
+ )
+
+ logging.info(u"Application started.")
- logging.info("Application started.")
try:
spec = Specification(args.specification)
spec.read_specification()
- except PresentationError:
- logging.critical("Finished with error.")
+ except PresentationError as err:
+ logging.critical(u"Finished with error.")
+ logging.critical(repr(err))
return 1
- if spec.output["output"] not in ("report", "CPTA"):
- logging.critical("The output '{0}' is not supported.".
- format(spec.output["output"]))
+ if spec.output[u"output"] not in OUTPUTS:
+ logging.critical(
+ f"The output {spec.output[u'output']} is not supported."
+ )
return 1
- ret_code = 1
+ return_code = 1
try:
env = Environment(spec.environment, args.force)
env.set_environment()
prepare_static_content(spec)
- data = InputData(spec)
- data.download_and_parse_data(repeat=1)
+ data = InputData(spec, spec.output[u"output"])
+ if args.input_file:
+ data.process_local_file(args.input_file)
+ elif args.input_directory:
+ data.process_local_directory(args.input_directory)
+ else:
+ data.download_and_parse_data(repeat=1)
+
+ if args.print_all_oper_data:
+ data.print_all_oper_data()
generate_tables(spec, data)
generate_plots(spec, data)
generate_files(spec, data)
- if spec.output["output"] == "report":
+ if spec.output[u"output"] == u"report":
generate_report(args.release, spec, args.week)
- logging.info("Successfully finished.")
- elif spec.output["output"] == "CPTA":
+ elif spec.output[u"output"] == u"trending":
sys.stdout.write(generate_cpta(spec, data))
try:
alert = Alerting(spec)
alert.generate_alerts()
except AlertingError as err:
logging.warning(repr(err))
- logging.info("Successfully finished.")
- ret_code = 0
+ elif spec.output[u"output"] == u"convert-xml-to-json":
+ convert_xml_to_json(spec, data)
+ else:
+ logging.info("No output will be generated.")
+
+ logging.info(u"Successfully finished.")
+ return_code = 0
except AlertingError as err:
- logging.critical("Finished with an alerting error.")
- logging.critical(repr(err))
+ logging.critical(f"Finished with an alerting error.\n{repr(err)}")
except PresentationError as err:
- logging.critical("Finished with an PAL error.")
- logging.critical(repr(err))
+ logging.critical(f"Finished with a PAL error.\n{str(err)}")
except (KeyError, ValueError) as err:
- logging.critical("Finished with an error.")
- logging.critical(repr(err))
- except Exception as err:
- logging.critical("Finished with an unexpected error.")
- logging.critical(repr(err))
+ logging.critical(f"Finished with an error.\n{repr(err)}")
finally:
if spec is not None:
clean_environment(spec.environment)
- return ret_code
+ return return_code
-if __name__ == '__main__':
+if __name__ == u"__main__":
sys.exit(main())