feat(uti): Add iterative data
[csit.git] / resources / tools / dash / app / pal / data / data.py
1 # Copyright (c) 2022 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Prepare data for Plotly Dash."""
15
16 import logging
17
18 from yaml import load, FullLoader, YAMLError
19 from datetime import datetime, timedelta
20 from time import time
21 from pytz import UTC
22 from pandas import DataFrame
23
24 import awswrangler as wr
25
26 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
27
28
29 class Data:
30     """
31     """
32
33     def __init__(self, data_spec_file: str, debug: bool=False) -> None:
34         """
35         """
36
37         # Inputs:
38         self._data_spec_file = data_spec_file
39         self._debug = debug
40
41         # Specification of data to be read from parquets:
42         self._data_spec = None
43
44         # Data frame to keep the data:
45         self._data = None
46
47         # Read from files:
48         try:
49             with open(self._data_spec_file, "r") as file_read:
50                 self._data_spec = load(file_read, Loader=FullLoader)
51         except IOError as err:
52             raise RuntimeError(
53                 f"Not possible to open the file {self._data_spec_file,}\n{err}"
54             )
55         except YAMLError as err:
56             raise RuntimeError(
57                 f"An error occurred while parsing the specification file "
58                 f"{self._data_spec_file,}\n"
59                 f"{err}"
60             )
61
62     @property
63     def data(self):
64         return self._data
65
66     def _get_columns(self, parquet: str) -> list:
67         try:
68             return self._data_spec[parquet]["columns"]
69         except KeyError as err:
70             raise RuntimeError(
71                 f"The parquet {parquet} is not defined in the specification "
72                 f"file {self._data_spec_file} or it does not have any columns "
73                 f"specified.\n{err}"
74             )
75
76     def _get_path(self, parquet: str) -> str:
77         try:
78             return self._data_spec[parquet]["path"]
79         except KeyError as err:
80             raise RuntimeError(
81                 f"The parquet {parquet} is not defined in the specification "
82                 f"file {self._data_spec_file} or it does not have the path "
83                 f"specified.\n{err}"
84             )
85
86     def _create_dataframe_from_parquet(self,
87         path, partition_filter=None, columns=None,
88         validate_schema=False, last_modified_begin=None,
89         last_modified_end=None, days=None) -> DataFrame:
90         """Read parquet stored in S3 compatible storage and returns Pandas
91         Dataframe.
92
93         :param path: S3 prefix (accepts Unix shell-style wildcards)
94             (e.g. s3://bucket/prefix) or list of S3 objects paths
95             (e.g. [s3://bucket/key0, s3://bucket/key1]).
96         :param partition_filter: Callback Function filters to apply on PARTITION
97             columns (PUSH-DOWN filter). This function MUST receive a single
98             argument (Dict[str, str]) where keys are partitions names and values
99             are partitions values. Partitions values will be always strings
100             extracted from S3. This function MUST return a bool, True to read
101             the partition or False to ignore it. Ignored if dataset=False.
102         :param columns: Names of columns to read from the file(s).
103         :param validate_schema: Check that individual file schemas are all the
104             same / compatible. Schemas within a folder prefix should all be the
105             same. Disable if you have schemas that are different and want to
106             disable this check.
107         :param last_modified_begin: Filter the s3 files by the Last modified
108             date of the object. The filter is applied only after list all s3
109             files.
110         :param last_modified_end: Filter the s3 files by the Last modified date
111             of the object. The filter is applied only after list all s3 files.
112         :type path: Union[str, List[str]]
113         :type partition_filter: Callable[[Dict[str, str]], bool], optional
114         :type columns: List[str], optional
115         :type validate_schema: bool, optional
116         :type last_modified_begin: datetime, optional
117         :type last_modified_end: datetime, optional
118         :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
119         :rtype: DataFrame
120         """
121         df = None
122         start = time()
123         if days:
124             last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
125         try:
126             df = wr.s3.read_parquet(
127                 path=path,
128                 path_suffix="parquet",
129                 ignore_empty=True,
130                 validate_schema=validate_schema,
131                 use_threads=True,
132                 dataset=True,
133                 columns=columns,
134                 partition_filter=partition_filter,
135                 last_modified_begin=last_modified_begin,
136                 last_modified_end=last_modified_end
137             )
138             if self._debug:
139                 df.info(verbose=True, memory_usage='deep')
140                 logging.info(
141                     u"\n"
142                     f"Creation of dataframe {path} took: {time() - start}"
143                     u"\n"
144                 )
145         except NoFilesFound as err:
146             logging.error(f"No parquets found.\n{err}")
147         except EmptyDataFrame as err:
148             logging.error(f"No data.\n{err}")
149
150         self._data = df
151         return df
152
153     def read_stats(self, days: int=None) -> tuple:
154         """Read Suite Result Analysis data partition from parquet.
155         """
156         l_stats = lambda part: True if part["stats_type"] == "sra" else False
157         l_mrr = lambda part: True if part["test_type"] == "mrr" else False
158         l_ndrpdr = lambda part: True if part["test_type"] == "ndrpdr" else False
159
160         return (
161             self._create_dataframe_from_parquet(
162                 path=self._get_path("statistics"),
163                 partition_filter=l_stats,
164                 columns=self._get_columns("statistics"),
165                 days=days
166             ),
167             self._create_dataframe_from_parquet(
168                 path=self._get_path("statistics-trending"),
169                 partition_filter=l_mrr,
170                 columns=self._get_columns("statistics-trending"),
171                 days=days
172             ),
173             self._create_dataframe_from_parquet(
174                 path=self._get_path("statistics-trending"),
175                 partition_filter=l_ndrpdr,
176                 columns=self._get_columns("statistics-trending"),
177                 days=days
178             )
179         )
180
181     def read_trending_mrr(self, days: int=None) -> DataFrame:
182         """Read MRR data partition from parquet.
183         """
184         lambda_f = lambda part: True if part["test_type"] == "mrr" else False
185
186         return self._create_dataframe_from_parquet(
187             path=self._get_path("trending-mrr"),
188             partition_filter=lambda_f,
189             columns=self._get_columns("trending-mrr"),
190             days=days
191         )
192
193     def read_trending_ndrpdr(self, days: int=None) -> DataFrame:
194         """Read NDRPDR data partition from iterative parquet.
195         """
196         lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
197
198         return self._create_dataframe_from_parquet(
199             path=self._get_path("trending-ndrpdr"),
200             partition_filter=lambda_f,
201             columns=self._get_columns("trending-ndrpdr"),
202             days=days
203         )
204
205     def read_iterative_mrr(self, release: str) -> DataFrame:
206         """Read MRR data partition from iterative parquet.
207         """
208         lambda_f = lambda part: True if part["test_type"] == "mrr" else False
209
210         return self._create_dataframe_from_parquet(
211             path=self._get_path("iterative-mrr").format(release=release),
212             partition_filter=lambda_f,
213             columns=self._get_columns("iterative-mrr")
214         )
215
216     def read_iterative_ndrpdr(self, release: str) -> DataFrame:
217         """Read NDRPDR data partition from parquet.
218         """
219         lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
220
221         return self._create_dataframe_from_parquet(
222             path=self._get_path("iterative-ndrpdr").format(release=release),
223             partition_filter=lambda_f,
224             columns=self._get_columns("iterative-ndrpdr")
225         )