telemetry: error message handling part2
[csit.git] / resources / tools / telemetry / executor.py
1 # Copyright (c) 2022 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Config executor library."""
15
16 from importlib import import_module
17 from logging.config import dictConfig
18 from logging import getLogger
19 import sys
20
21 from .parser import Parser
22 from .serializer import Serializer
23 from .constants import Constants
24
25
26 class Executor:
27     """
28     Executor class reponsible for executing configuration.
29     """
30     def __init__(self, configuration_file):
31         """
32         Config Executor init.
33
34         :param configuration_file: Telemetry configuration file path.
35         :type configuration_file: str
36         """
37         self.parser = Parser(configuration_file)
38         self.log = self.parser.config[u"logging"]
39         self.programs = self.parser.config[u"programs"]
40         self.scheduler = self.parser.config[u"scheduler"]
41
42         dictConfig(self.log)
43
44     def execute(self, hook=None):
45         """
46         Main executor function will run programs from all bundles in a loop.
47
48         Function call:
49             attach(duration)
50             fetch_data()
51             process_data()
52             detach()
53
54         :param hook: Process ID or socket to attach. None by default.
55         :type hook: int
56         """
57         for program in self.programs:
58             serializer = Serializer()
59             try:
60                 package = program[u"name"]
61                 name = f"telemetry.{package}"
62                 package = package.replace("_", " ").title().replace(" ", "")
63                 module = import_module(
64                     name=name,
65                     package=package
66                 )
67                 bundle = getattr(module, package)(
68                     program=program,
69                     serializer=serializer,
70                     hook=hook
71                 )
72                 bundle.attach(duration=self.scheduler[u"duration"])
73                 bundle.fetch_data()
74                 bundle.process_data()
75                 bundle.detach()
76             except (ImportError, AttributeError) as exc:
77                 raise ExecutorError(
78                     f"Error executing bundle {package!r}! - {exc}"
79                 )
80             serializer.publish()
81
82     def execute_daemon(self, hook=None):
83         """
84         Daemon executor will execute endless loop.
85
86         :param hook: Process ID to attach. None by default.
87         :type hook: int
88         """
89         while True:
90             self.execute(hook=hook)
91
92
93 class ExecutorError(Exception):
94     """
95     Creates a Executor Error Exception. This exception is supposed to handle
96     all the errors raised during executing.
97     """
98     def __init__(self, message):
99         """
100         Execute Error Excpetion init.
101
102         :param message: Exception error message.
103         :type message: str
104         """
105         super().__init__()
106         self.message = message
107         getLogger("console_stderr").error(message)
108         sys.exit(Constants.err_telemetry_bundle)