a3b6c2a478be4c23893b64d3cd6d5492046048d9
[csit.git] / resources / tools / dash / app / pal / data / data.py
1 # Copyright (c) 2022 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Prepare data for Plotly Dash."""
15
16 from datetime import datetime, timedelta
17 import logging
18 from time import time
19
20 import awswrangler as wr
21 from pytz import UTC
22
23 from yaml import load, FullLoader, YAMLError
24 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
25
26
27 class Data:
28     """
29     """
30
31     def __init__(self, data_spec_file, debug=False):
32         """
33         """
34
35         # Inputs:
36         self._data_spec_file = data_spec_file
37         self._debug = debug
38
39         # Specification of data to be read from parquets:
40         self._data_spec = None
41
42         # Data frame to keep the data:
43         self._data = None
44
45         # Read from files:
46         try:
47             with open(self._data_spec_file, "r") as file_read:
48                 self._data_spec = load(file_read, Loader=FullLoader)
49         except IOError as err:
50             raise RuntimeError(
51                 f"Not possible to open the file {self._data_spec_file,}\n{err}"
52             )
53         except YAMLError as err:
54             raise RuntimeError(
55                 f"An error occurred while parsing the specification file "
56                 f"{self._data_spec_file,}\n"
57                 f"{err}"
58             )
59
60     @property
61     def data(self):
62         return self._data
63
64     def _get_columns(self, parquet):
65         try:
66             return self._data_spec[parquet]["columns"]
67         except KeyError as err:
68             raise RuntimeError(
69                 f"The parquet {parquet} is not defined in the specification "
70                 f"file {self._data_spec_file} or it does not have any columns "
71                 f"specified.\n{err}"
72             )
73
74     def _get_path(self, parquet):
75         try:
76             return self._data_spec[parquet]["path"]
77         except KeyError as err:
78             raise RuntimeError(
79                 f"The parquet {parquet} is not defined in the specification "
80                 f"file {self._data_spec_file} or it does not have the path "
81                 f"specified.\n{err}"
82             )
83
84     def _create_dataframe_from_parquet(self,
85         path, partition_filter=None, columns=None,
86         validate_schema=False, last_modified_begin=None,
87         last_modified_end=None, days=None):
88         """Read parquet stored in S3 compatible storage and returns Pandas
89         Dataframe.
90
91         :param path: S3 prefix (accepts Unix shell-style wildcards)
92             (e.g. s3://bucket/prefix) or list of S3 objects paths
93             (e.g. [s3://bucket/key0, s3://bucket/key1]).
94         :param partition_filter: Callback Function filters to apply on PARTITION
95             columns (PUSH-DOWN filter). This function MUST receive a single
96             argument (Dict[str, str]) where keys are partitions names and values
97             are partitions values. Partitions values will be always strings
98             extracted from S3. This function MUST return a bool, True to read
99             the partition or False to ignore it. Ignored if dataset=False.
100         :param columns: Names of columns to read from the file(s).
101         :param validate_schema: Check that individual file schemas are all the
102             same / compatible. Schemas within a folder prefix should all be the
103             same. Disable if you have schemas that are different and want to
104             disable this check.
105         :param last_modified_begin: Filter the s3 files by the Last modified
106             date of the object. The filter is applied only after list all s3
107             files.
108         :param last_modified_end: Filter the s3 files by the Last modified date
109             of the object. The filter is applied only after list all s3 files.
110         :type path: Union[str, List[str]]
111         :type partition_filter: Callable[[Dict[str, str]], bool], optional
112         :type columns: List[str], optional
113         :type validate_schema: bool, optional
114         :type last_modified_begin: datetime, optional
115         :type last_modified_end: datetime, optional
116         :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
117         :rtype: DataFrame
118         """
119         df = None
120         start = time()
121         if days:
122             last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
123         try:
124             df = wr.s3.read_parquet(
125                 path=path,
126                 path_suffix="parquet",
127                 ignore_empty=True,
128                 validate_schema=validate_schema,
129                 use_threads=True,
130                 dataset=True,
131                 columns=columns,
132                 partition_filter=partition_filter,
133                 last_modified_begin=last_modified_begin,
134                 last_modified_end=last_modified_end
135             )
136             if self._debug:
137                 df.info(verbose=True, memory_usage='deep')
138                 logging.info(
139                     u"\n"
140                     f"Creation of dataframe {path} took: {time() - start}"
141                     u"\n"
142                 )
143         except NoFilesFound as err:
144             logging.error(f"No parquets found.\n{err}")
145         except EmptyDataFrame as err:
146             logging.error(f"No data.\n{err}")
147
148         self._data = df
149         return df
150
151     def read_stats(self, days=None):
152         """Read Suite Result Analysis data partition from parquet.
153         """
154         lambda_f = lambda part: True if part["stats_type"] == "sra" else False
155
156         return self._create_dataframe_from_parquet(
157             path=self._get_path("statistics"),
158             partition_filter=lambda_f,
159             columns=None,  # Get all columns.
160             days=days
161         )
162
163     def read_trending_mrr(self, days=None):
164         """Read MRR data partition from parquet.
165         """
166         lambda_f = lambda part: True if part["test_type"] == "mrr" else False
167
168         return self._create_dataframe_from_parquet(
169             path=self._get_path("trending-mrr"),
170             partition_filter=lambda_f,
171             columns=self._get_columns("trending-mrr"),
172             days=days
173         )
174
175     def read_trending_ndrpdr(self, days=None):
176         """Read NDRPDR data partition from iterative parquet.
177         """
178         lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
179
180         return self._create_dataframe_from_parquet(
181             path=self._get_path("trending-ndrpdr"),
182             partition_filter=lambda_f,
183             columns=self._get_columns("trending-ndrpdr"),
184             days=days
185         )
186
187     def read_iterative_mrr(self, days=None):
188         """Read MRR data partition from iterative parquet.
189         """
190         lambda_f = lambda part: True if part["test_type"] == "mrr" else False
191
192         return self._create_dataframe_from_parquet(
193             path=self._get_path("iterative-mrr"),
194             partition_filter=lambda_f,
195             columns=self._get_columns("iterative-mrr"),
196             days=days
197         )
198
199     def read_iterative_ndrpdr(self, days=None):
200         """Read NDRPDR data partition from parquet.
201         """
202         lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
203
204         return self._create_dataframe_from_parquet(
205             path=self._get_path("iterative-ndrpdr"),
206             partition_filter=lambda_f,
207             columns=self._get_columns("iterative-ndrpdr"),
208             days=days
209         )