UTI: PoC - Read data from parquets
[csit.git] / resources / tools / dash / app / pal / data / data.py
1 # Copyright (c) 2022 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Prepare data for Plotly Dash."""
15
16 import logging
17 from time import time
18
19 import awswrangler as wr
20
21 from yaml import load, FullLoader, YAMLError
22 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
23
24
25 class Data:
26     """
27     """
28
29     def __init__(self, data_spec_file, debug=False):
30         """
31         """
32
33         # Inputs:
34         self._data_spec_file = data_spec_file
35         self._debug = debug
36
37         # Specification of data to be read from parquets:
38         self._data_spec = None
39
40         # Data frame to keep the data:
41         self._data = None
42
43         # Read from files:
44         try:
45             with open(self._data_spec_file, "r") as file_read:
46                 self._data_spec = load(file_read, Loader=FullLoader)
47         except IOError as err:
48             raise RuntimeError(
49                 f"Not possible to open the file {self._data_spec_file,}\n{err}"
50             )
51         except YAMLError as err:
52             raise RuntimeError(
53                 f"An error occurred while parsing the specification file "
54                 f"{self._data_spec_file,}\n"
55                 f"{err}"
56             )
57
58     @property
59     def data(self):
60         return self._data
61
62     def _get_columns(self, parquet):
63         try:
64             return self._data_spec[parquet]["columns"]
65         except KeyError as err:
66             raise RuntimeError(
67                 f"The parquet {parquet} is not defined in the specification "
68                 f"file {self._data_spec_file} or it does not have any columns "
69                 f"specified.\n{err}"
70             )
71
72     def _get_path(self, parquet):
73         try:
74             return self._data_spec[parquet]["path"]
75         except KeyError as err:
76             raise RuntimeError(
77                 f"The parquet {parquet} is not defined in the specification "
78                 f"file {self._data_spec_file} or it does not have the path "
79                 f"specified.\n{err}"
80             )
81
82     def _create_dataframe_from_parquet(self,
83         path, partition_filter=None, columns=None,
84         validate_schema=False, last_modified_begin=None,
85         last_modified_end=None):
86         """Read parquet stored in S3 compatible storage and returns Pandas
87         Dataframe.
88
89         :param path: S3 prefix (accepts Unix shell-style wildcards)
90             (e.g. s3://bucket/prefix) or list of S3 objects paths
91             (e.g. [s3://bucket/key0, s3://bucket/key1]).
92         :param partition_filter: Callback Function filters to apply on PARTITION
93             columns (PUSH-DOWN filter). This function MUST receive a single
94             argument (Dict[str, str]) where keys are partitions names and values
95             are partitions values. Partitions values will be always strings
96             extracted from S3. This function MUST return a bool, True to read
97             the partition or False to ignore it. Ignored if dataset=False.
98         :param columns: Names of columns to read from the file(s).
99         :param validate_schema: Check that individual file schemas are all the
100             same / compatible. Schemas within a folder prefix should all be the
101             same. Disable if you have schemas that are different and want to
102             disable this check.
103         :param last_modified_begin: Filter the s3 files by the Last modified
104             date of the object. The filter is applied only after list all s3
105             files.
106         :param last_modified_end: Filter the s3 files by the Last modified date
107             of the object. The filter is applied only after list all s3 files.
108         :type path: Union[str, List[str]]
109         :type partition_filter: Callable[[Dict[str, str]], bool], optional
110         :type columns: List[str], optional
111         :type validate_schema: bool, optional
112         :type last_modified_begin: datetime, optional
113         :type last_modified_end: datetime, optional
114         :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
115         :rtype: DataFrame
116         """
117         df = None
118         start = time()
119         try:
120             df = wr.s3.read_parquet(
121                 path=path,
122                 path_suffix="parquet",
123                 ignore_empty=True,
124                 validate_schema=validate_schema,
125                 use_threads=True,
126                 dataset=True,
127                 columns=columns,
128                 partition_filter=partition_filter,
129                 last_modified_begin=last_modified_begin,
130                 last_modified_end=last_modified_end
131             )
132             if self._debug:
133                 df.info(verbose=True, memory_usage='deep')
134                 logging.info(
135                     u"\n"
136                     f"Creation of dataframe {path} took: {time() - start}"
137                     u"\n"
138                     f"{df}"
139                     u"\n"
140                 )
141         except NoFilesFound as err:
142             logging.error(f"No parquets found.\n{err}")
143         except EmptyDataFrame as err:
144             logging.error(f"No data.\n{err}")
145
146         self._data = df
147         return df
148
149     def read_stats(self):
150         """Read Suite Result Analysis data partition from parquet.
151         """
152         lambda_f = lambda part: True if part["stats_type"] == "sra" else False
153
154         return self._create_dataframe_from_parquet(
155             path=self._get_path("statistics"),
156             partition_filter=lambda_f,
157             columns=None  # Get all columns.
158         )
159
160     def read_trending_mrr(self):
161         """Read MRR data partition from parquet.
162         """
163         lambda_f = lambda part: True if part["test_type"] == "mrr" else False
164
165         return self._create_dataframe_from_parquet(
166             path=self._get_path("trending-mrr"),
167             partition_filter=lambda_f,
168             columns=self._get_columns("trending-mrr")
169         )
170
171     def read_trending_ndrpdr(self):
172         """Read NDRPDR data partition from iterative parquet.
173         """
174         lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
175
176         return self._create_dataframe_from_parquet(
177             path=self._get_path("trending-ndrpdr"),
178             partition_filter=lambda_f,
179             columns=self._get_columns("trending-ndrpdr")
180         )
181
182     def read_iterative_mrr(self):
183         """Read MRR data partition from iterative parquet.
184         """
185         lambda_f = lambda part: True if part["test_type"] == "mrr" else False
186
187         return self._create_dataframe_from_parquet(
188             path=self._get_path("iterative-mrr"),
189             partition_filter=lambda_f,
190             columns=self._get_columns("iterative-mrr")
191         )
192
193     def read_iterative_ndrpdr(self):
194         """Read NDRPDR data partition from parquet.
195         """
196         lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
197
198         return self._create_dataframe_from_parquet(
199             path=self._get_path("iterative-ndrpdr"),
200             partition_filter=lambda_f,
201             columns=self._get_columns("iterative-ndrpdr")
202         )