3d9b8b166417cc03a3ae220a07e70319b623d925
[csit.git] / resources / tools / dash / app / pal / data / data.py
1 # Copyright (c) 2022 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Prepare data for Plotly Dash."""
15
16 from datetime import datetime, timedelta
17 import logging
18 from time import time
19
20 import awswrangler as wr
21 from pytz import UTC
22
23 from yaml import load, FullLoader, YAMLError
24 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
25
26
27 class Data:
28     """
29     """
30
31     def __init__(self, data_spec_file, debug=False):
32         """
33         """
34
35         # Inputs:
36         self._data_spec_file = data_spec_file
37         self._debug = debug
38
39         # Specification of data to be read from parquets:
40         self._data_spec = None
41
42         # Data frame to keep the data:
43         self._data = None
44
45         # Read from files:
46         try:
47             with open(self._data_spec_file, "r") as file_read:
48                 self._data_spec = load(file_read, Loader=FullLoader)
49         except IOError as err:
50             raise RuntimeError(
51                 f"Not possible to open the file {self._data_spec_file,}\n{err}"
52             )
53         except YAMLError as err:
54             raise RuntimeError(
55                 f"An error occurred while parsing the specification file "
56                 f"{self._data_spec_file,}\n"
57                 f"{err}"
58             )
59
60     @property
61     def data(self):
62         return self._data
63
64     def _get_columns(self, parquet):
65         try:
66             return self._data_spec[parquet]["columns"]
67         except KeyError as err:
68             raise RuntimeError(
69                 f"The parquet {parquet} is not defined in the specification "
70                 f"file {self._data_spec_file} or it does not have any columns "
71                 f"specified.\n{err}"
72             )
73
74     def _get_path(self, parquet):
75         try:
76             return self._data_spec[parquet]["path"]
77         except KeyError as err:
78             raise RuntimeError(
79                 f"The parquet {parquet} is not defined in the specification "
80                 f"file {self._data_spec_file} or it does not have the path "
81                 f"specified.\n{err}"
82             )
83
84     def _create_dataframe_from_parquet(self,
85         path, partition_filter=None, columns=None,
86         validate_schema=False, last_modified_begin=None,
87         last_modified_end=None, days=None):
88         """Read parquet stored in S3 compatible storage and returns Pandas
89         Dataframe.
90
91         :param path: S3 prefix (accepts Unix shell-style wildcards)
92             (e.g. s3://bucket/prefix) or list of S3 objects paths
93             (e.g. [s3://bucket/key0, s3://bucket/key1]).
94         :param partition_filter: Callback Function filters to apply on PARTITION
95             columns (PUSH-DOWN filter). This function MUST receive a single
96             argument (Dict[str, str]) where keys are partitions names and values
97             are partitions values. Partitions values will be always strings
98             extracted from S3. This function MUST return a bool, True to read
99             the partition or False to ignore it. Ignored if dataset=False.
100         :param columns: Names of columns to read from the file(s).
101         :param validate_schema: Check that individual file schemas are all the
102             same / compatible. Schemas within a folder prefix should all be the
103             same. Disable if you have schemas that are different and want to
104             disable this check.
105         :param last_modified_begin: Filter the s3 files by the Last modified
106             date of the object. The filter is applied only after list all s3
107             files.
108         :param last_modified_end: Filter the s3 files by the Last modified date
109             of the object. The filter is applied only after list all s3 files.
110         :type path: Union[str, List[str]]
111         :type partition_filter: Callable[[Dict[str, str]], bool], optional
112         :type columns: List[str], optional
113         :type validate_schema: bool, optional
114         :type last_modified_begin: datetime, optional
115         :type last_modified_end: datetime, optional
116         :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
117         :rtype: DataFrame
118         """
119         df = None
120         start = time()
121         if days:
122             last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
123         try:
124             df = wr.s3.read_parquet(
125                 path=path,
126                 path_suffix="parquet",
127                 ignore_empty=True,
128                 validate_schema=validate_schema,
129                 use_threads=True,
130                 dataset=True,
131                 columns=columns,
132                 partition_filter=partition_filter,
133                 last_modified_begin=last_modified_begin,
134                 last_modified_end=last_modified_end
135             )
136             if self._debug:
137                 df.info(verbose=True, memory_usage='deep')
138                 logging.info(
139                     u"\n"
140                     f"Creation of dataframe {path} took: {time() - start}"
141                     u"\n"
142                 )
143         except NoFilesFound as err:
144             logging.error(f"No parquets found.\n{err}")
145         except EmptyDataFrame as err:
146             logging.error(f"No data.\n{err}")
147
148         self._data = df
149         return df
150
151     def read_stats(self, days=None):
152         """Read Suite Result Analysis data partition from parquet.
153         """
154         l_stats = lambda part: True if part["stats_type"] == "sra" else False
155         l_mrr = lambda part: True if part["test_type"] == "mrr" else False
156         l_ndrpdr = lambda part: True if part["test_type"] == "ndrpdr" else False
157
158         return (
159             self._create_dataframe_from_parquet(
160                 path=self._get_path("statistics"),
161                 partition_filter=l_stats,
162                 columns=self._get_columns("statistics"),
163                 days=days
164             ),
165             self._create_dataframe_from_parquet(
166                 path=self._get_path("statistics-trending"),
167                 partition_filter=l_mrr,
168                 columns=self._get_columns("statistics-trending"),
169                 days=days
170             ),
171             self._create_dataframe_from_parquet(
172                 path=self._get_path("statistics-trending"),
173                 partition_filter=l_ndrpdr,
174                 columns=self._get_columns("statistics-trending"),
175                 days=days
176             )
177         )
178
179     def read_trending_mrr(self, days=None):
180         """Read MRR data partition from parquet.
181         """
182         lambda_f = lambda part: True if part["test_type"] == "mrr" else False
183
184         return self._create_dataframe_from_parquet(
185             path=self._get_path("trending-mrr"),
186             partition_filter=lambda_f,
187             columns=self._get_columns("trending-mrr"),
188             days=days
189         )
190
191     def read_trending_ndrpdr(self, days=None):
192         """Read NDRPDR data partition from iterative parquet.
193         """
194         lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
195
196         return self._create_dataframe_from_parquet(
197             path=self._get_path("trending-ndrpdr"),
198             partition_filter=lambda_f,
199             columns=self._get_columns("trending-ndrpdr"),
200             days=days
201         )
202
203     def read_iterative_mrr(self, days=None):
204         """Read MRR data partition from iterative parquet.
205         """
206         lambda_f = lambda part: True if part["test_type"] == "mrr" else False
207
208         return self._create_dataframe_from_parquet(
209             path=self._get_path("iterative-mrr"),
210             partition_filter=lambda_f,
211             columns=self._get_columns("iterative-mrr"),
212             days=days
213         )
214
215     def read_iterative_ndrpdr(self, days=None):
216         """Read NDRPDR data partition from parquet.
217         """
218         lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
219
220         return self._create_dataframe_from_parquet(
221             path=self._get_path("iterative-ndrpdr"),
222             partition_filter=lambda_f,
223             columns=self._get_columns("iterative-ndrpdr"),
224             days=days
225         )