1 # Copyright (c) 2022 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Prepare data for Plotly Dash."""
16 from datetime import datetime, timedelta
20 import awswrangler as wr
23 from yaml import load, FullLoader, YAMLError
24 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
31 def __init__(self, data_spec_file, debug=False):
36 self._data_spec_file = data_spec_file
39 # Specification of data to be read from parquets:
40 self._data_spec = None
42 # Data frame to keep the data:
47 with open(self._data_spec_file, "r") as file_read:
48 self._data_spec = load(file_read, Loader=FullLoader)
49 except IOError as err:
51 f"Not possible to open the file {self._data_spec_file,}\n{err}"
53 except YAMLError as err:
55 f"An error occurred while parsing the specification file "
56 f"{self._data_spec_file,}\n"
64 def _get_columns(self, parquet):
66 return self._data_spec[parquet]["columns"]
67 except KeyError as err:
69 f"The parquet {parquet} is not defined in the specification "
70 f"file {self._data_spec_file} or it does not have any columns "
74 def _get_path(self, parquet):
76 return self._data_spec[parquet]["path"]
77 except KeyError as err:
79 f"The parquet {parquet} is not defined in the specification "
80 f"file {self._data_spec_file} or it does not have the path "
84 def _create_dataframe_from_parquet(self,
85 path, partition_filter=None, columns=None,
86 validate_schema=False, last_modified_begin=None,
87 last_modified_end=None, days=None):
88 """Read parquet stored in S3 compatible storage and returns Pandas
91 :param path: S3 prefix (accepts Unix shell-style wildcards)
92 (e.g. s3://bucket/prefix) or list of S3 objects paths
93 (e.g. [s3://bucket/key0, s3://bucket/key1]).
94 :param partition_filter: Callback Function filters to apply on PARTITION
95 columns (PUSH-DOWN filter). This function MUST receive a single
96 argument (Dict[str, str]) where keys are partitions names and values
97 are partitions values. Partitions values will be always strings
98 extracted from S3. This function MUST return a bool, True to read
99 the partition or False to ignore it. Ignored if dataset=False.
100 :param columns: Names of columns to read from the file(s).
101 :param validate_schema: Check that individual file schemas are all the
102 same / compatible. Schemas within a folder prefix should all be the
103 same. Disable if you have schemas that are different and want to
105 :param last_modified_begin: Filter the s3 files by the Last modified
106 date of the object. The filter is applied only after list all s3
108 :param last_modified_end: Filter the s3 files by the Last modified date
109 of the object. The filter is applied only after list all s3 files.
110 :type path: Union[str, List[str]]
111 :type partition_filter: Callable[[Dict[str, str]], bool], optional
112 :type columns: List[str], optional
113 :type validate_schema: bool, optional
114 :type last_modified_begin: datetime, optional
115 :type last_modified_end: datetime, optional
116 :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
122 last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
124 df = wr.s3.read_parquet(
126 path_suffix="parquet",
128 validate_schema=validate_schema,
132 partition_filter=partition_filter,
133 last_modified_begin=last_modified_begin,
134 last_modified_end=last_modified_end
137 df.info(verbose=True, memory_usage='deep')
140 f"Creation of dataframe {path} took: {time() - start}"
143 except NoFilesFound as err:
144 logging.error(f"No parquets found.\n{err}")
145 except EmptyDataFrame as err:
146 logging.error(f"No data.\n{err}")
151 def read_stats(self, days=None):
152 """Read Suite Result Analysis data partition from parquet.
154 l_stats = lambda part: True if part["stats_type"] == "sra" else False
155 l_mrr = lambda part: True if part["test_type"] == "mrr" else False
156 l_ndrpdr = lambda part: True if part["test_type"] == "ndrpdr" else False
159 self._create_dataframe_from_parquet(
160 path=self._get_path("statistics"),
161 partition_filter=l_stats,
162 columns=self._get_columns("statistics"),
165 self._create_dataframe_from_parquet(
166 path=self._get_path("statistics-trending"),
167 partition_filter=l_mrr,
168 columns=self._get_columns("statistics-trending"),
171 self._create_dataframe_from_parquet(
172 path=self._get_path("statistics-trending"),
173 partition_filter=l_ndrpdr,
174 columns=self._get_columns("statistics-trending"),
179 def read_trending_mrr(self, days=None):
180 """Read MRR data partition from parquet.
182 lambda_f = lambda part: True if part["test_type"] == "mrr" else False
184 return self._create_dataframe_from_parquet(
185 path=self._get_path("trending-mrr"),
186 partition_filter=lambda_f,
187 columns=self._get_columns("trending-mrr"),
191 def read_trending_ndrpdr(self, days=None):
192 """Read NDRPDR data partition from iterative parquet.
194 lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
196 return self._create_dataframe_from_parquet(
197 path=self._get_path("trending-ndrpdr"),
198 partition_filter=lambda_f,
199 columns=self._get_columns("trending-ndrpdr"),
203 def read_iterative_mrr(self, days=None):
204 """Read MRR data partition from iterative parquet.
206 lambda_f = lambda part: True if part["test_type"] == "mrr" else False
208 return self._create_dataframe_from_parquet(
209 path=self._get_path("iterative-mrr"),
210 partition_filter=lambda_f,
211 columns=self._get_columns("iterative-mrr"),
215 def read_iterative_ndrpdr(self, days=None):
216 """Read NDRPDR data partition from parquet.
218 lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
220 return self._create_dataframe_from_parquet(
221 path=self._get_path("iterative-ndrpdr"),
222 partition_filter=lambda_f,
223 columns=self._get_columns("iterative-ndrpdr"),