1 # Copyright (c) 2023 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Prepare data for Plotly Dash applications.
19 import awswrangler as wr
23 from yaml import load, FullLoader, YAMLError
24 from datetime import datetime, timedelta
27 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
28 from pyarrow.lib import ArrowInvalid, ArrowNotImplementedError
30 from ..utils.constants import Constants as C
33 # If True, pyarrow.Schema is generated. See also condition in the method
34 # _write_parquet_schema.
35 # To generate schema, select only one data set in data.yaml file.
36 GENERATE_SCHEMA = False
40 """Gets the data from parquets and stores it for further use by dash
44 def __init__(self, data_spec_file: str) -> None:
45 """Initialize the Data object.
47 :param data_spec_file: Path to file specifying the data to be read from
49 :type data_spec_file: str
50 :raises RuntimeError: if it is not possible to open data_spec_file or it
51 is not a valid yaml file.
55 self._data_spec_file = data_spec_file
57 # Specification of data to be read from parquets:
58 self._data_spec = list()
60 # Data frame to keep the data:
62 "statistics": pd.DataFrame(),
63 "trending": pd.DataFrame(),
64 "iterative": pd.DataFrame(),
65 "coverage": pd.DataFrame()
70 with open(self._data_spec_file, "r") as file_read:
71 self._data_spec = load(file_read, Loader=FullLoader)
72 except IOError as err:
74 f"Not possible to open the file {self._data_spec_file,}\n{err}"
76 except YAMLError as err:
78 f"An error occurred while parsing the specification file "
79 f"{self._data_spec_file,}\n"
88 def _get_list_of_files(
90 last_modified_begin=None,
91 last_modified_end=None,
94 """Get list of interested files stored in S3 compatible storage and
97 :param path: S3 prefix (accepts Unix shell-style wildcards)
98 (e.g. s3://bucket/prefix) or list of S3 objects paths
99 (e.g. [s3://bucket/key0, s3://bucket/key1]).
100 :param last_modified_begin: Filter the s3 files by the Last modified
101 date of the object. The filter is applied only after list all s3
103 :param last_modified_end: Filter the s3 files by the Last modified date
104 of the object. The filter is applied only after list all s3 files.
105 :param days: Number of days to filter.
106 :type path: Union[str, List[str]]
107 :type last_modified_begin: datetime, optional
108 :type last_modified_end: datetime, optional
109 :type days: integer, optional
110 :returns: List of file names.
115 last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
117 file_list = wr.s3.list_objects(
120 last_modified_begin=last_modified_begin,
121 last_modified_end=last_modified_end
123 logging.debug("\n".join(file_list))
124 except NoFilesFound as err:
125 logging.error(f"No parquets found.\n{err}")
126 except EmptyDataFrame as err:
127 logging.error(f"No data.\n{err}")
131 def _validate_columns(self, data_type: str) -> str:
132 """Check if all columns are present in the dataframe.
134 :param data_type: The data type defined in data.yaml
136 :returns: Error message if validation fails, otherwise empty string.
139 defined_columns = set()
140 for data_set in self._data_spec:
141 if data_set.get("data_type", str()) == data_type:
142 defined_columns.update(data_set.get("columns", set()))
144 if not defined_columns:
145 return "No columns defined in the data set(s)."
147 if self.data[data_type].empty:
151 for col in defined_columns:
152 if col not in self.data[data_type].columns:
154 ret_msg = "Missing columns: "
161 def _write_parquet_schema(
163 partition_filter=None,
165 validate_schema=False,
166 last_modified_begin=None,
167 last_modified_end=None,
170 """Auxiliary function to write parquet schemas. Use it instead of
171 "_create_dataframe_from_parquet" in "read_all_data".
173 :param path: S3 prefix (accepts Unix shell-style wildcards)
174 (e.g. s3://bucket/prefix) or list of S3 objects paths
175 (e.g. [s3://bucket/key0, s3://bucket/key1]).
176 :param partition_filter: Callback Function filters to apply on PARTITION
177 columns (PUSH-DOWN filter). This function MUST receive a single
178 argument (Dict[str, str]) where keys are partitions names and values
179 are partitions values. Partitions values will be always strings
180 extracted from S3. This function MUST return a bool, True to read
181 the partition or False to ignore it. Ignored if dataset=False.
182 :param columns: Names of columns to read from the file(s).
183 :param validate_schema: Check that individual file schemas are all the
184 same / compatible. Schemas within a folder prefix should all be the
185 same. Disable if you have schemas that are different and want to
187 :param last_modified_begin: Filter the s3 files by the Last modified
188 date of the object. The filter is applied only after list all s3
190 :param last_modified_end: Filter the s3 files by the Last modified date
191 of the object. The filter is applied only after list all s3 files.
192 :param days: Number of days to filter.
193 :type path: Union[str, List[str]]
194 :type partition_filter: Callable[[Dict[str, str]], bool], optional
195 :type columns: List[str], optional
196 :type validate_schema: bool, optional
197 :type last_modified_begin: datetime, optional
198 :type last_modified_end: datetime, optional
199 :type days: integer, optional
202 last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
204 df = wr.s3.read_parquet(
206 path_suffix="parquet",
208 validate_schema=validate_schema,
212 partition_filter=partition_filter,
213 last_modified_begin=last_modified_begin,
214 last_modified_end=last_modified_end,
220 # Specify the condition or remove it:
222 pd.api.types.is_string_dtype(itm["<column_name>"]),
223 pd.api.types.is_string_dtype(itm["telemetry"][0])
225 print(pa.Schema.from_pandas(itm))
226 pa.parquet.write_metadata(
227 pa.Schema.from_pandas(itm),
228 f"{C.PATH_TO_SCHEMAS}_tmp_schema"
236 def _create_dataframe_from_parquet(
238 partition_filter=None,
240 validate_schema=False,
241 last_modified_begin=None,
242 last_modified_end=None,
246 """Read parquet stored in S3 compatible storage and returns Pandas
249 :param path: S3 prefix (accepts Unix shell-style wildcards)
250 (e.g. s3://bucket/prefix) or list of S3 objects paths
251 (e.g. [s3://bucket/key0, s3://bucket/key1]).
252 :param partition_filter: Callback Function filters to apply on PARTITION
253 columns (PUSH-DOWN filter). This function MUST receive a single
254 argument (Dict[str, str]) where keys are partitions names and values
255 are partitions values. Partitions values will be always strings
256 extracted from S3. This function MUST return a bool, True to read
257 the partition or False to ignore it. Ignored if dataset=False.
258 :param columns: Names of columns to read from the file(s).
259 :param validate_schema: Check that individual file schemas are all the
260 same / compatible. Schemas within a folder prefix should all be the
261 same. Disable if you have schemas that are different and want to
263 :param last_modified_begin: Filter the s3 files by the Last modified
264 date of the object. The filter is applied only after list all s3
266 :param last_modified_end: Filter the s3 files by the Last modified date
267 of the object. The filter is applied only after list all s3 files.
268 :param days: Number of days to filter.
269 :param schema: Path to schema to use when reading data from the parquet.
270 :type path: Union[str, List[str]]
271 :type partition_filter: Callable[[Dict[str, str]], bool], optional
272 :type columns: List[str], optional
273 :type validate_schema: bool, optional
274 :type last_modified_begin: datetime, optional
275 :type last_modified_end: datetime, optional
276 :type days: integer, optional
278 :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
284 last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
286 df = wr.s3.read_parquet(
288 path_suffix="parquet",
291 validate_schema=validate_schema,
295 partition_filter=partition_filter,
296 last_modified_begin=last_modified_begin,
297 last_modified_end=last_modified_end,
298 dtype_backend="pyarrow"
301 df.info(verbose=True, memory_usage="deep")
303 f"\nCreation of dataframe {path} took: {time() - start}\n"
305 except (ArrowInvalid, ArrowNotImplementedError) as err:
306 logging.error(f"Reading of data from parquets FAILED.\n{repr(err)}")
307 except NoFilesFound as err:
309 f"Reading of data from parquets FAILED.\n"
310 f"No parquets found in specified time period.\n"
311 f"Nr of days: {days}\n"
312 f"last_modified_begin: {last_modified_begin}\n"
315 except EmptyDataFrame as err:
317 f"Reading of data from parquets FAILED.\n"
318 f"No data in parquets in specified time period.\n"
319 f"Nr of days: {days}\n"
320 f"last_modified_begin: {last_modified_begin}\n"
326 def read_all_data(self, days: int=None) -> dict:
327 """Read all data necessary for all applications.
329 :param days: Number of days to filter. If None, all data will be
332 :returns: A dictionary where keys are names of parquets and values are
333 the pandas dataframes with fetched data.
334 :rtype: dict(str: pandas.DataFrame)
338 "statistics": list(),
344 logging.info("\n\nReading data:\n" + "-" * 13 + "\n")
345 for data_set in self._data_spec:
347 f"\n\nReading data for {data_set['data_type']} "
348 f"{data_set['partition_name']} {data_set.get('release', '')}\n"
350 schema_file = data_set.get("schema", None)
353 schema = pa.parquet.read_schema(
354 f"{C.PATH_TO_SCHEMAS}{schema_file}"
356 except FileNotFoundError as err:
357 logging.error(repr(err))
358 logging.error("Proceeding without schema.")
362 partition_filter = lambda part: True \
363 if part[data_set["partition"]] == data_set["partition_name"] \
365 if data_set["data_type"] in ("trending", "statistics"):
372 Data._write_parquet_schema(
373 path=data_set["path"],
374 partition_filter=partition_filter,
375 columns=data_set.get("columns", None),
381 data = Data._create_dataframe_from_parquet(
382 path=data_set["path"],
383 partition_filter=partition_filter,
384 columns=data_set.get("columns", None),
388 if data_set["data_type"] in ("iterative", "coverage"):
389 data["release"] = data_set["release"]
390 data["release"] = data["release"].astype("category")
392 data_lists[data_set["data_type"]].append(data)
395 "\n\nData post-processing, validation and summary:\n" +
398 for key in self._data.keys():
399 logging.info(f"\n\nDataframe {key}:\n")
400 self._data[key] = pd.concat(
405 self._data[key].info(verbose=True, memory_usage="deep")
406 err_msg = self._validate_columns(key)
408 self._data[key] = pd.DataFrame()
410 f"Data validation FAILED.\n"
412 "Generated dataframe replaced by an empty dataframe."
415 mem_alloc = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000
416 logging.info(f"\n\nMemory allocation: {mem_alloc:.0f}MB\n")