75db4b6a40ee03889c63c7f3898360ee1a3dbb4d
[csit.git] / resources / tools / telemetry / executor.py
1 # Copyright (c) 2021 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Config executor library."""
15
16 from importlib import import_module
17 from logging.config import dictConfig
18 from logging import getLogger
19 import sys
20
21 from .parser import Parser
22 from .serializer import Serializer
23
24
25 class Executor:
26     """
27     Executor class reponsible for executing configuration.
28     """
29     def __init__(self, configuration_file):
30         """
31         Config Executor init.
32
33         :param configuration_file: Telemetry configuration file path.
34         :type configuration_file: str
35         """
36         self.parser = Parser(configuration_file)
37         self.log = self.parser.config[u"logging"]
38         self.programs = self.parser.config[u"programs"]
39         self.scheduler = self.parser.config[u"scheduler"]
40
41         dictConfig(self.log)
42
43     def execute(self, hook=None):
44         """
45         Main executor function will run programs from all bundles in a loop.
46
47         Function call:
48             attach(duration)
49             fetch_data()
50             process_data()
51             detach()
52
53         :param hook: Process ID or socket to attach. None by default.
54         :type hook: int
55         """
56         for program in self.programs:
57             serializer = Serializer()
58             try:
59                 package = program[u"name"]
60                 name = f"telemetry.{package}"
61                 package = package.replace("_", " ").title().replace(" ", "")
62                 module = import_module(
63                     name=name,
64                     package=package
65                 )
66                 bundle = getattr(module, package)(
67                     program=program,
68                     serializer=serializer,
69                     hook=hook
70                 )
71                 bundle.attach(duration=self.scheduler[u"duration"])
72                 bundle.fetch_data()
73                 bundle.process_data()
74                 bundle.detach()
75             except (ImportError, AttributeError) as exc:
76                 raise ExecutorError(
77                     f"Error executing bundle {package!r}! - {exc}"
78                 )
79             serializer.publish()
80
81     def execute_daemon(self, hook=None):
82         """
83         Daemon executor will execute endless loop.
84
85         :param hook: Process ID to attach. None by default.
86         :type hook: int
87         """
88         while True:
89             self.execute(hook=hook)
90
91
92 class ExecutorError(Exception):
93     """
94     Creates a Executor Error Exception. This exception is supposed to handle
95     all the errors raised during executing.
96     """
97     def __init__(self, message):
98         """
99         Execute Error Excpetion init.
100
101         :param message: Exception error message.
102         :type message: str
103         """
104         super().__init__()
105         self.message = message
106         getLogger(__name__).error(message)
107         sys.exit(1)