1 # Copyright (c) 2023 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Prepare data for Plotly Dash applications.
19 import awswrangler as wr
23 from yaml import load, FullLoader, YAMLError
24 from datetime import datetime, timedelta
27 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
28 from pyarrow.lib import ArrowInvalid, ArrowNotImplementedError
30 from ..utils.constants import Constants as C
34 """Gets the data from parquets and stores it for further use by dash
38 def __init__(self, data_spec_file: str) -> None:
39 """Initialize the Data object.
41 :param data_spec_file: Path to file specifying the data to be read from
43 :type data_spec_file: str
44 :raises RuntimeError: if it is not possible to open data_spec_file or it
45 is not a valid yaml file.
49 self._data_spec_file = data_spec_file
51 # Specification of data to be read from parquets:
52 self._data_spec = list()
54 # Data frame to keep the data:
56 "statistics": pd.DataFrame(),
57 "trending": pd.DataFrame(),
58 "iterative": pd.DataFrame(),
59 "coverage": pd.DataFrame()
64 with open(self._data_spec_file, "r") as file_read:
65 self._data_spec = load(file_read, Loader=FullLoader)
66 except IOError as err:
68 f"Not possible to open the file {self._data_spec_file,}\n{err}"
70 except YAMLError as err:
72 f"An error occurred while parsing the specification file "
73 f"{self._data_spec_file,}\n"
82 def _get_list_of_files(
84 last_modified_begin=None,
85 last_modified_end=None,
88 """Get list of interested files stored in S3 compatible storage and
91 :param path: S3 prefix (accepts Unix shell-style wildcards)
92 (e.g. s3://bucket/prefix) or list of S3 objects paths
93 (e.g. [s3://bucket/key0, s3://bucket/key1]).
94 :param last_modified_begin: Filter the s3 files by the Last modified
95 date of the object. The filter is applied only after list all s3
97 :param last_modified_end: Filter the s3 files by the Last modified date
98 of the object. The filter is applied only after list all s3 files.
99 :param days: Number of days to filter.
100 :type path: Union[str, List[str]]
101 :type last_modified_begin: datetime, optional
102 :type last_modified_end: datetime, optional
103 :type days: integer, optional
104 :returns: List of file names.
109 last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
111 file_list = wr.s3.list_objects(
114 last_modified_begin=last_modified_begin,
115 last_modified_end=last_modified_end
117 logging.debug("\n".join(file_list))
118 except NoFilesFound as err:
119 logging.error(f"No parquets found.\n{err}")
120 except EmptyDataFrame as err:
121 logging.error(f"No data.\n{err}")
125 def _validate_columns(self, data_type: str) -> str:
126 """Check if all columns are present in the dataframe.
128 :param data_type: The data type defined in data.yaml
130 :returns: Error message if validation fails, otherwise empty string.
133 defined_columns = set()
134 for data_set in self._data_spec:
135 if data_set.get("data_type", str()) == data_type:
136 defined_columns.update(data_set.get("columns", set()))
138 if not defined_columns:
139 return "No columns defined in the data set(s)."
141 if self.data[data_type].empty:
145 for col in defined_columns:
146 if col not in self.data[data_type].columns:
148 ret_msg = "Missing columns: "
155 def _write_parquet_schema(
157 partition_filter=None,
159 validate_schema=False,
160 last_modified_begin=None,
161 last_modified_end=None,
164 """Auxiliary function to write parquet schemas. Use it instead of
165 "_create_dataframe_from_parquet" in "read_all_data".
167 :param path: S3 prefix (accepts Unix shell-style wildcards)
168 (e.g. s3://bucket/prefix) or list of S3 objects paths
169 (e.g. [s3://bucket/key0, s3://bucket/key1]).
170 :param partition_filter: Callback Function filters to apply on PARTITION
171 columns (PUSH-DOWN filter). This function MUST receive a single
172 argument (Dict[str, str]) where keys are partitions names and values
173 are partitions values. Partitions values will be always strings
174 extracted from S3. This function MUST return a bool, True to read
175 the partition or False to ignore it. Ignored if dataset=False.
176 :param columns: Names of columns to read from the file(s).
177 :param validate_schema: Check that individual file schemas are all the
178 same / compatible. Schemas within a folder prefix should all be the
179 same. Disable if you have schemas that are different and want to
181 :param last_modified_begin: Filter the s3 files by the Last modified
182 date of the object. The filter is applied only after list all s3
184 :param last_modified_end: Filter the s3 files by the Last modified date
185 of the object. The filter is applied only after list all s3 files.
186 :param days: Number of days to filter.
187 :type path: Union[str, List[str]]
188 :type partition_filter: Callable[[Dict[str, str]], bool], optional
189 :type columns: List[str], optional
190 :type validate_schema: bool, optional
191 :type last_modified_begin: datetime, optional
192 :type last_modified_end: datetime, optional
193 :type days: integer, optional
196 last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
198 df = wr.s3.read_parquet(
200 path_suffix="parquet",
202 validate_schema=validate_schema,
206 partition_filter=partition_filter,
207 last_modified_begin=last_modified_begin,
208 last_modified_end=last_modified_end,
214 # Specify the condition or remove it:
215 if pd.api.types.is_string_dtype(itm["result_rate_unit"]):
216 print(pa.Schema.from_pandas(itm))
217 pa.parquet.write_metadata(
218 pa.Schema.from_pandas(itm),
219 f"{C.PATH_TO_SCHEMAS}_tmp_schema"
227 def _create_dataframe_from_parquet(
229 partition_filter=None,
231 validate_schema=False,
232 last_modified_begin=None,
233 last_modified_end=None,
237 """Read parquet stored in S3 compatible storage and returns Pandas
240 :param path: S3 prefix (accepts Unix shell-style wildcards)
241 (e.g. s3://bucket/prefix) or list of S3 objects paths
242 (e.g. [s3://bucket/key0, s3://bucket/key1]).
243 :param partition_filter: Callback Function filters to apply on PARTITION
244 columns (PUSH-DOWN filter). This function MUST receive a single
245 argument (Dict[str, str]) where keys are partitions names and values
246 are partitions values. Partitions values will be always strings
247 extracted from S3. This function MUST return a bool, True to read
248 the partition or False to ignore it. Ignored if dataset=False.
249 :param columns: Names of columns to read from the file(s).
250 :param validate_schema: Check that individual file schemas are all the
251 same / compatible. Schemas within a folder prefix should all be the
252 same. Disable if you have schemas that are different and want to
254 :param last_modified_begin: Filter the s3 files by the Last modified
255 date of the object. The filter is applied only after list all s3
257 :param last_modified_end: Filter the s3 files by the Last modified date
258 of the object. The filter is applied only after list all s3 files.
259 :param days: Number of days to filter.
260 :param schema: Path to schema to use when reading data from the parquet.
261 :type path: Union[str, List[str]]
262 :type partition_filter: Callable[[Dict[str, str]], bool], optional
263 :type columns: List[str], optional
264 :type validate_schema: bool, optional
265 :type last_modified_begin: datetime, optional
266 :type last_modified_end: datetime, optional
267 :type days: integer, optional
269 :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
275 last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
277 df = wr.s3.read_parquet(
279 path_suffix="parquet",
282 validate_schema=validate_schema,
286 partition_filter=partition_filter,
287 last_modified_begin=last_modified_begin,
288 last_modified_end=last_modified_end,
289 dtype_backend="pyarrow"
292 df.info(verbose=True, memory_usage="deep")
294 f"\nCreation of dataframe {path} took: {time() - start}\n"
296 except (ArrowInvalid, ArrowNotImplementedError) as err:
297 logging.error(f"Reading of data from parquets FAILED.\n{repr(err)}")
298 except NoFilesFound as err:
300 f"Reading of data from parquets FAILED.\n"
301 f"No parquets found in specified time period.\n"
302 f"Nr of days: {days}\n"
303 f"last_modified_begin: {last_modified_begin}\n"
306 except EmptyDataFrame as err:
308 f"Reading of data from parquets FAILED.\n"
309 f"No data in parquets in specified time period.\n"
310 f"Nr of days: {days}\n"
311 f"last_modified_begin: {last_modified_begin}\n"
317 def read_all_data(self, days: int=None) -> dict:
318 """Read all data necessary for all applications.
320 :param days: Number of days to filter. If None, all data will be
323 :returns: A dictionary where keys are names of parquets and values are
324 the pandas dataframes with fetched data.
325 :rtype: dict(str: pandas.DataFrame)
329 "statistics": list(),
335 logging.info("\n\nReading data:\n" + "-" * 13 + "\n")
336 for data_set in self._data_spec:
338 f"\n\nReading data for {data_set['data_type']} "
339 f"{data_set['partition_name']} {data_set.get('release', '')}\n"
341 schema_file = data_set.get("schema", None)
344 schema = pa.parquet.read_schema(
345 f"{C.PATH_TO_SCHEMAS}{schema_file}"
347 except FileNotFoundError as err:
348 logging.error(repr(err))
349 logging.error("Proceeding without schema.")
353 partition_filter = lambda part: True \
354 if part[data_set["partition"]] == data_set["partition_name"] \
356 if data_set["data_type"] in ("trending", "statistics"):
360 data = Data._create_dataframe_from_parquet(
361 path=data_set["path"],
362 partition_filter=partition_filter,
363 columns=data_set.get("columns", None),
367 if data_set["data_type"] in ("iterative", "coverage"):
368 data["release"] = data_set["release"]
369 data["release"] = data["release"].astype("category")
371 data_lists[data_set["data_type"]].append(data)
374 "\n\nData post-processing, validation and summary:\n" +
377 for key in self._data.keys():
378 logging.info(f"\n\nDataframe {key}:\n")
379 self._data[key] = pd.concat(
384 self._data[key].info(verbose=True, memory_usage="deep")
385 err_msg = self._validate_columns(key)
387 self._data[key] = pd.DataFrame()
389 f"Data validation FAILED.\n"
391 "Generated dataframe replaced by an empty dataframe."
394 mem_alloc = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000
395 logging.info(f"\n\nMemory allocation: {mem_alloc:.0f}MB\n")