1 # Copyright (c) 2022 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Prepare data for Plotly Dash."""
19 import awswrangler as wr
21 from yaml import load, FullLoader, YAMLError
22 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
29 def __init__(self, data_spec_file, debug=False):
34 self._data_spec_file = data_spec_file
37 # Specification of data to be read from parquets:
38 self._data_spec = None
40 # Data frame to keep the data:
45 with open(self._data_spec_file, "r") as file_read:
46 self._data_spec = load(file_read, Loader=FullLoader)
47 except IOError as err:
49 f"Not possible to open the file {self._data_spec_file,}\n{err}"
51 except YAMLError as err:
53 f"An error occurred while parsing the specification file "
54 f"{self._data_spec_file,}\n"
62 def _get_columns(self, parquet):
64 return self._data_spec[parquet]["columns"]
65 except KeyError as err:
67 f"The parquet {parquet} is not defined in the specification "
68 f"file {self._data_spec_file} or it does not have any columns "
72 def _get_path(self, parquet):
74 return self._data_spec[parquet]["path"]
75 except KeyError as err:
77 f"The parquet {parquet} is not defined in the specification "
78 f"file {self._data_spec_file} or it does not have the path "
82 def _create_dataframe_from_parquet(self,
83 path, partition_filter=None, columns=None,
84 validate_schema=False, last_modified_begin=None,
85 last_modified_end=None):
86 """Read parquet stored in S3 compatible storage and returns Pandas
89 :param path: S3 prefix (accepts Unix shell-style wildcards)
90 (e.g. s3://bucket/prefix) or list of S3 objects paths
91 (e.g. [s3://bucket/key0, s3://bucket/key1]).
92 :param partition_filter: Callback Function filters to apply on PARTITION
93 columns (PUSH-DOWN filter). This function MUST receive a single
94 argument (Dict[str, str]) where keys are partitions names and values
95 are partitions values. Partitions values will be always strings
96 extracted from S3. This function MUST return a bool, True to read
97 the partition or False to ignore it. Ignored if dataset=False.
98 :param columns: Names of columns to read from the file(s).
99 :param validate_schema: Check that individual file schemas are all the
100 same / compatible. Schemas within a folder prefix should all be the
101 same. Disable if you have schemas that are different and want to
103 :param last_modified_begin: Filter the s3 files by the Last modified
104 date of the object. The filter is applied only after list all s3
106 :param last_modified_end: Filter the s3 files by the Last modified date
107 of the object. The filter is applied only after list all s3 files.
108 :type path: Union[str, List[str]]
109 :type partition_filter: Callable[[Dict[str, str]], bool], optional
110 :type columns: List[str], optional
111 :type validate_schema: bool, optional
112 :type last_modified_begin: datetime, optional
113 :type last_modified_end: datetime, optional
114 :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
120 df = wr.s3.read_parquet(
122 path_suffix="parquet",
124 validate_schema=validate_schema,
128 partition_filter=partition_filter,
129 last_modified_begin=last_modified_begin,
130 last_modified_end=last_modified_end
133 df.info(verbose=True, memory_usage='deep')
136 f"Creation of dataframe {path} took: {time() - start}"
141 except NoFilesFound as err:
142 logging.error(f"No parquets found.\n{err}")
143 except EmptyDataFrame as err:
144 logging.error(f"No data.\n{err}")
149 def read_stats(self):
150 """Read Suite Result Analysis data partition from parquet.
152 lambda_f = lambda part: True if part["stats_type"] == "sra" else False
154 return self._create_dataframe_from_parquet(
155 path=self._get_path("statistics"),
156 partition_filter=lambda_f,
157 columns=None # Get all columns.
160 def read_trending_mrr(self):
161 """Read MRR data partition from parquet.
163 lambda_f = lambda part: True if part["test_type"] == "mrr" else False
165 return self._create_dataframe_from_parquet(
166 path=self._get_path("trending-mrr"),
167 partition_filter=lambda_f,
168 columns=self._get_columns("trending-mrr")
171 def read_trending_ndrpdr(self):
172 """Read NDRPDR data partition from iterative parquet.
174 lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
176 return self._create_dataframe_from_parquet(
177 path=self._get_path("trending-ndrpdr"),
178 partition_filter=lambda_f,
179 columns=self._get_columns("trending-ndrpdr")
182 def read_iterative_mrr(self):
183 """Read MRR data partition from iterative parquet.
185 lambda_f = lambda part: True if part["test_type"] == "mrr" else False
187 return self._create_dataframe_from_parquet(
188 path=self._get_path("iterative-mrr"),
189 partition_filter=lambda_f,
190 columns=self._get_columns("iterative-mrr")
193 def read_iterative_ndrpdr(self):
194 """Read NDRPDR data partition from parquet.
196 lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
198 return self._create_dataframe_from_parquet(
199 path=self._get_path("iterative-ndrpdr"),
200 partition_filter=lambda_f,
201 columns=self._get_columns("iterative-ndrpdr")