1 # Copyright (c) 2023 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Prepare data for Plotly Dash applications.
19 import awswrangler as wr
23 from yaml import load, FullLoader, YAMLError
24 from datetime import datetime, timedelta
27 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
28 from pyarrow.lib import ArrowInvalid, ArrowNotImplementedError
30 from ..utils.constants import Constants as C
33 # If True, pyarrow.Schema is generated. See also condition in the method
34 # _write_parquet_schema.
35 # To generate schema, select only one data set in data.yaml file.
36 GENERATE_SCHEMA = False
40 """Gets the data from parquets and stores it for further use by dash
44 def __init__(self, data_spec_file: str) -> None:
45 """Initialize the Data object.
47 :param data_spec_file: Path to file specifying the data to be read from
49 :type data_spec_file: str
50 :raises RuntimeError: if it is not possible to open data_spec_file or it
51 is not a valid yaml file.
55 self._data_spec_file = data_spec_file
57 # Specification of data to be read from parquets:
58 self._data_spec = list()
60 # Data frame to keep the data:
62 "statistics": pd.DataFrame(),
63 "trending": pd.DataFrame(),
64 "iterative": pd.DataFrame(),
65 "coverage": pd.DataFrame()
70 with open(self._data_spec_file, "r") as file_read:
71 self._data_spec = load(file_read, Loader=FullLoader)
72 except IOError as err:
74 f"Not possible to open the file {self._data_spec_file,}\n{err}"
76 except YAMLError as err:
78 f"An error occurred while parsing the specification file "
79 f"{self._data_spec_file,}\n"
88 def _get_list_of_files(
90 last_modified_begin=None,
91 last_modified_end=None,
94 """Get list of interested files stored in S3 compatible storage and
97 :param path: S3 prefix (accepts Unix shell-style wildcards)
98 (e.g. s3://bucket/prefix) or list of S3 objects paths
99 (e.g. [s3://bucket/key0, s3://bucket/key1]).
100 :param last_modified_begin: Filter the s3 files by the Last modified
101 date of the object. The filter is applied only after list all s3
103 :param last_modified_end: Filter the s3 files by the Last modified date
104 of the object. The filter is applied only after list all s3 files.
105 :param days: Number of days to filter.
106 :type path: Union[str, List[str]]
107 :type last_modified_begin: datetime, optional
108 :type last_modified_end: datetime, optional
109 :type days: integer, optional
110 :returns: List of file names.
115 last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
117 file_list = wr.s3.list_objects(
120 last_modified_begin=last_modified_begin,
121 last_modified_end=last_modified_end
123 logging.debug("\n".join(file_list))
124 except NoFilesFound as err:
125 logging.error(f"No parquets found.\n{err}")
126 except EmptyDataFrame as err:
127 logging.error(f"No data.\n{err}")
131 def _validate_columns(self, data_type: str) -> str:
132 """Check if all columns are present in the dataframe.
134 :param data_type: The data type defined in data.yaml
136 :returns: Error message if validation fails, otherwise empty string.
139 defined_columns = set()
140 for data_set in self._data_spec:
141 if data_set.get("data_type", str()) == data_type:
142 defined_columns.update(data_set.get("columns", set()))
144 if not defined_columns:
145 return "No columns defined in the data set(s)."
147 if self.data[data_type].empty:
151 for col in defined_columns:
152 if col not in self.data[data_type].columns:
154 ret_msg = "Missing columns: "
161 def _write_parquet_schema(
163 partition_filter=None,
165 validate_schema=False,
166 last_modified_begin=None,
167 last_modified_end=None,
170 """Auxiliary function to write parquet schemas. Use it instead of
171 "_create_dataframe_from_parquet" in "read_all_data".
173 :param path: S3 prefix (accepts Unix shell-style wildcards)
174 (e.g. s3://bucket/prefix) or list of S3 objects paths
175 (e.g. [s3://bucket/key0, s3://bucket/key1]).
176 :param partition_filter: Callback Function filters to apply on PARTITION
177 columns (PUSH-DOWN filter). This function MUST receive a single
178 argument (Dict[str, str]) where keys are partitions names and values
179 are partitions values. Partitions values will be always strings
180 extracted from S3. This function MUST return a bool, True to read
181 the partition or False to ignore it. Ignored if dataset=False.
182 :param columns: Names of columns to read from the file(s).
183 :param validate_schema: Check that individual file schemas are all the
184 same / compatible. Schemas within a folder prefix should all be the
185 same. Disable if you have schemas that are different and want to
187 :param last_modified_begin: Filter the s3 files by the Last modified
188 date of the object. The filter is applied only after list all s3
190 :param last_modified_end: Filter the s3 files by the Last modified date
191 of the object. The filter is applied only after list all s3 files.
192 :param days: Number of days to filter.
193 :type path: Union[str, List[str]]
194 :type partition_filter: Callable[[Dict[str, str]], bool], optional
195 :type columns: List[str], optional
196 :type validate_schema: bool, optional
197 :type last_modified_begin: datetime, optional
198 :type last_modified_end: datetime, optional
199 :type days: integer, optional
202 last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
204 df = wr.s3.read_parquet(
206 path_suffix="parquet",
208 validate_schema=validate_schema,
212 partition_filter=partition_filter,
213 last_modified_begin=last_modified_begin,
214 last_modified_end=last_modified_end,
220 # Specify the condition or remove it:
222 pd.api.types.is_string_dtype(itm["column_name"]),
223 pd.api.types.is_string_dtype(itm["telemetry"][0])
225 schema = pa.Schema.from_pandas(itm)
226 pa.parquet.write_metadata(
227 schema, f"{C.PATH_TO_SCHEMAS}_tmp_schema"
229 logging.info(schema.to_string(
230 truncate_metadata=False,
231 show_field_metadata=True,
232 show_schema_metadata=True
239 def _create_dataframe_from_parquet(
241 partition_filter=None,
243 validate_schema=False,
244 last_modified_begin=None,
245 last_modified_end=None,
249 """Read parquet stored in S3 compatible storage and returns Pandas
252 :param path: S3 prefix (accepts Unix shell-style wildcards)
253 (e.g. s3://bucket/prefix) or list of S3 objects paths
254 (e.g. [s3://bucket/key0, s3://bucket/key1]).
255 :param partition_filter: Callback Function filters to apply on PARTITION
256 columns (PUSH-DOWN filter). This function MUST receive a single
257 argument (Dict[str, str]) where keys are partitions names and values
258 are partitions values. Partitions values will be always strings
259 extracted from S3. This function MUST return a bool, True to read
260 the partition or False to ignore it. Ignored if dataset=False.
261 :param columns: Names of columns to read from the file(s).
262 :param validate_schema: Check that individual file schemas are all the
263 same / compatible. Schemas within a folder prefix should all be the
264 same. Disable if you have schemas that are different and want to
266 :param last_modified_begin: Filter the s3 files by the Last modified
267 date of the object. The filter is applied only after list all s3
269 :param last_modified_end: Filter the s3 files by the Last modified date
270 of the object. The filter is applied only after list all s3 files.
271 :param days: Number of days to filter.
272 :param schema: Path to schema to use when reading data from the parquet.
273 :type path: Union[str, List[str]]
274 :type partition_filter: Callable[[Dict[str, str]], bool], optional
275 :type columns: List[str], optional
276 :type validate_schema: bool, optional
277 :type last_modified_begin: datetime, optional
278 :type last_modified_end: datetime, optional
279 :type days: integer, optional
281 :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
287 last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
289 df = wr.s3.read_parquet(
291 path_suffix="parquet",
294 validate_schema=validate_schema,
298 partition_filter=partition_filter,
299 last_modified_begin=last_modified_begin,
300 last_modified_end=last_modified_end,
301 dtype_backend="pyarrow"
304 df.info(verbose=True, memory_usage="deep")
306 f"\nCreation of dataframe {path} took: {time() - start}\n"
308 except (ArrowInvalid, ArrowNotImplementedError) as err:
309 logging.error(f"Reading of data from parquets FAILED.\n{repr(err)}")
310 except NoFilesFound as err:
312 f"Reading of data from parquets FAILED.\n"
313 f"No parquets found in specified time period.\n"
314 f"Nr of days: {days}\n"
315 f"last_modified_begin: {last_modified_begin}\n"
318 except EmptyDataFrame as err:
320 f"Reading of data from parquets FAILED.\n"
321 f"No data in parquets in specified time period.\n"
322 f"Nr of days: {days}\n"
323 f"last_modified_begin: {last_modified_begin}\n"
329 def read_all_data(self, days: int=None) -> dict:
330 """Read all data necessary for all applications.
332 :param days: Number of days to filter. If None, all data will be
335 :returns: A dictionary where keys are names of parquets and values are
336 the pandas dataframes with fetched data.
337 :rtype: dict(str: pandas.DataFrame)
341 "statistics": list(),
347 logging.info("\n\nReading data:\n" + "-" * 13 + "\n")
348 for data_set in self._data_spec:
350 f"\n\nReading data for {data_set['data_type']} "
351 f"{data_set['partition_name']} {data_set.get('release', '')}\n"
353 schema_file = data_set.get("schema", None)
356 schema = pa.parquet.read_schema(
357 f"{C.PATH_TO_SCHEMAS}{schema_file}"
359 except FileNotFoundError as err:
360 logging.error(repr(err))
361 logging.error("Proceeding without schema.")
365 partition_filter = lambda part: True \
366 if part[data_set["partition"]] == data_set["partition_name"] \
368 if data_set["data_type"] in ("trending", "statistics"):
375 Data._write_parquet_schema(
376 path=data_set["path"],
377 partition_filter=partition_filter,
378 columns=data_set.get("columns", None),
384 data = Data._create_dataframe_from_parquet(
385 path=data_set["path"],
386 partition_filter=partition_filter,
387 columns=data_set.get("columns", None),
391 if data_set["data_type"] in ("iterative", "coverage"):
392 data["release"] = data_set["release"]
393 data["release"] = data["release"].astype("category")
395 data_lists[data_set["data_type"]].append(data)
398 "\n\nData post-processing, validation and summary:\n" +
401 for key in self._data.keys():
402 logging.info(f"\n\nDataframe {key}:\n")
403 self._data[key] = pd.concat(
408 self._data[key].info(verbose=True, memory_usage="deep")
409 err_msg = self._validate_columns(key)
411 self._data[key] = pd.DataFrame()
413 f"Data validation FAILED.\n"
415 "Generated dataframe replaced by an empty dataframe."
418 mem_alloc = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000
419 logging.info(f"\n\nMemory allocation: {mem_alloc:.0f}MB\n")