1 # Copyright (c) 2022 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Prepare data for Plotly Dash."""
19 import awswrangler as wr
21 from yaml import load, FullLoader, YAMLError
22 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
29 def __init__(self, data_spec_file, debug=False):
34 self._data_spec_file = data_spec_file
37 # Specification of data to be read from parquets:
38 self._data_spec = None
40 # Data frame to keep the data:
45 with open(self._data_spec_file, "r") as file_read:
46 self._data_spec = load(file_read, Loader=FullLoader)
47 except IOError as err:
49 f"Not possible to open the file {self._data_spec_file,}\n{err}"
51 except YAMLError as err:
53 f"An error occurred while parsing the specification file "
54 f"{self._data_spec_file,}\n"
62 def _get_columns(self, parquet):
64 return self._data_spec[parquet]["columns"]
65 except KeyError as err:
67 f"The parquet {parquet} is not defined in the specification "
68 f"file {self._data_spec_file} or it does not have any columns "
72 def _get_path(self, parquet):
74 return self._data_spec[parquet]["path"]
75 except KeyError as err:
77 f"The parquet {parquet} is not defined in the specification "
78 f"file {self._data_spec_file} or it does not have the path "
82 def _create_dataframe_from_parquet(self,
83 path, partition_filter=None, columns=None,
84 validate_schema=False, last_modified_begin=None,
85 last_modified_end=None):
86 """Read parquet stored in S3 compatible storage and returns Pandas
89 :param path: S3 prefix (accepts Unix shell-style wildcards)
90 (e.g. s3://bucket/prefix) or list of S3 objects paths
91 (e.g. [s3://bucket/key0, s3://bucket/key1]).
92 :param partition_filter: Callback Function filters to apply on PARTITION
93 columns (PUSH-DOWN filter). This function MUST receive a single
94 argument (Dict[str, str]) where keys are partitions names and values
95 are partitions values. Partitions values will be always strings
96 extracted from S3. This function MUST return a bool, True to read
97 the partition or False to ignore it. Ignored if dataset=False.
98 :param columns: Names of columns to read from the file(s).
99 :param validate_schema: Check that individual file schemas are all the
100 same / compatible. Schemas within a folder prefix should all be the
101 same. Disable if you have schemas that are different and want to
103 :param last_modified_begin: Filter the s3 files by the Last modified
104 date of the object. The filter is applied only after list all s3
106 :param last_modified_end: Filter the s3 files by the Last modified date
107 of the object. The filter is applied only after list all s3 files.
108 :type path: Union[str, List[str]]
109 :type partition_filter: Callable[[Dict[str, str]], bool], optional
110 :type columns: List[str], optional
111 :type validate_schema: bool, optional
112 :type last_modified_begin: datetime, optional
113 :type last_modified_end: datetime, optional
114 :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
120 df = wr.s3.read_parquet(
122 path_suffix="parquet",
124 validate_schema=validate_schema,
128 partition_filter=partition_filter,
129 last_modified_begin=last_modified_begin,
130 last_modified_end=last_modified_end
133 df.info(verbose=True, memory_usage='deep')
136 f"Creation of dataframe {path} took: {time() - start}"
139 except NoFilesFound as err:
140 logging.error(f"No parquets found.\n{err}")
141 except EmptyDataFrame as err:
142 logging.error(f"No data.\n{err}")
147 def read_stats(self):
148 """Read Suite Result Analysis data partition from parquet.
150 lambda_f = lambda part: True if part["stats_type"] == "sra" else False
152 return self._create_dataframe_from_parquet(
153 path=self._get_path("statistics"),
154 partition_filter=lambda_f,
155 columns=None # Get all columns.
158 def read_trending_mrr(self):
159 """Read MRR data partition from parquet.
161 lambda_f = lambda part: True if part["test_type"] == "mrr" else False
163 return self._create_dataframe_from_parquet(
164 path=self._get_path("trending-mrr"),
165 partition_filter=lambda_f,
166 columns=self._get_columns("trending-mrr")
169 def read_trending_ndrpdr(self):
170 """Read NDRPDR data partition from iterative parquet.
172 lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
174 return self._create_dataframe_from_parquet(
175 path=self._get_path("trending-ndrpdr"),
176 partition_filter=lambda_f,
177 columns=self._get_columns("trending-ndrpdr")
180 def read_iterative_mrr(self):
181 """Read MRR data partition from iterative parquet.
183 lambda_f = lambda part: True if part["test_type"] == "mrr" else False
185 return self._create_dataframe_from_parquet(
186 path=self._get_path("iterative-mrr"),
187 partition_filter=lambda_f,
188 columns=self._get_columns("iterative-mrr")
191 def read_iterative_ndrpdr(self):
192 """Read NDRPDR data partition from parquet.
194 lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
196 return self._create_dataframe_from_parquet(
197 path=self._get_path("iterative-ndrpdr"),
198 partition_filter=lambda_f,
199 columns=self._get_columns("iterative-ndrpdr")