X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Ftools%2Ftelemetry%2Fexecutor.py;fp=resources%2Ftools%2Ftelemetry%2Fexecutor.py;h=75db4b6a40ee03889c63c7f3898360ee1a3dbb4d;hp=0000000000000000000000000000000000000000;hb=d255d2545ee6cdc871bc35314fad72c3c48b225b;hpb=82863d5b8422b1b817d86bd6b1829a06a49feb02 diff --git a/resources/tools/telemetry/executor.py b/resources/tools/telemetry/executor.py new file mode 100644 index 0000000000..75db4b6a40 --- /dev/null +++ b/resources/tools/telemetry/executor.py @@ -0,0 +1,107 @@ +# Copyright (c) 2021 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Config executor library.""" + +from importlib import import_module +from logging.config import dictConfig +from logging import getLogger +import sys + +from .parser import Parser +from .serializer import Serializer + + +class Executor: + """ + Executor class reponsible for executing configuration. + """ + def __init__(self, configuration_file): + """ + Config Executor init. + + :param configuration_file: Telemetry configuration file path. + :type configuration_file: str + """ + self.parser = Parser(configuration_file) + self.log = self.parser.config[u"logging"] + self.programs = self.parser.config[u"programs"] + self.scheduler = self.parser.config[u"scheduler"] + + dictConfig(self.log) + + def execute(self, hook=None): + """ + Main executor function will run programs from all bundles in a loop. + + Function call: + attach(duration) + fetch_data() + process_data() + detach() + + :param hook: Process ID or socket to attach. None by default. + :type hook: int + """ + for program in self.programs: + serializer = Serializer() + try: + package = program[u"name"] + name = f"telemetry.{package}" + package = package.replace("_", " ").title().replace(" ", "") + module = import_module( + name=name, + package=package + ) + bundle = getattr(module, package)( + program=program, + serializer=serializer, + hook=hook + ) + bundle.attach(duration=self.scheduler[u"duration"]) + bundle.fetch_data() + bundle.process_data() + bundle.detach() + except (ImportError, AttributeError) as exc: + raise ExecutorError( + f"Error executing bundle {package!r}! - {exc}" + ) + serializer.publish() + + def execute_daemon(self, hook=None): + """ + Daemon executor will execute endless loop. + + :param hook: Process ID to attach. None by default. + :type hook: int + """ + while True: + self.execute(hook=hook) + + +class ExecutorError(Exception): + """ + Creates a Executor Error Exception. This exception is supposed to handle + all the errors raised during executing. + """ + def __init__(self, message): + """ + Execute Error Excpetion init. + + :param message: Exception error message. + :type message: str + """ + super().__init__() + self.message = message + getLogger(__name__).error(message) + sys.exit(1)