2bf36497783a477ef6d93132f769df4f57453ff9
[csit.git] / csit.infra.dash / app / cdash / data / data.py
1 # Copyright (c) 2023 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Prepare data for Plotly Dash applications.
15 """
16
17 import logging
18 import resource
19 import awswrangler as wr
20 import pandas as pd
21 import pyarrow as pa
22
23 from yaml import load, FullLoader, YAMLError
24 from datetime import datetime, timedelta
25 from time import time
26 from pytz import UTC
27 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
28 from pyarrow.lib import ArrowInvalid, ArrowNotImplementedError
29
30 from ..utils.constants import Constants as C
31
32
33 class Data:
34     """Gets the data from parquets and stores it for further use by dash
35     applications.
36     """
37
38     def __init__(self, data_spec_file: str) -> None:
39         """Initialize the Data object.
40
41         :param data_spec_file: Path to file specifying the data to be read from
42             parquets.
43         :type data_spec_file: str
44         :raises RuntimeError: if it is not possible to open data_spec_file or it
45             is not a valid yaml file.
46         """
47
48         # Inputs:
49         self._data_spec_file = data_spec_file
50
51         # Specification of data to be read from parquets:
52         self._data_spec = list()
53
54         # Data frame to keep the data:
55         self._data = {
56             "statistics": pd.DataFrame(),
57             "trending": pd.DataFrame(),
58             "iterative": pd.DataFrame(),
59             "coverage": pd.DataFrame()
60         }
61
62         # Read from files:
63         try:
64             with open(self._data_spec_file, "r") as file_read:
65                 self._data_spec = load(file_read, Loader=FullLoader)
66         except IOError as err:
67             raise RuntimeError(
68                 f"Not possible to open the file {self._data_spec_file,}\n{err}"
69             )
70         except YAMLError as err:
71             raise RuntimeError(
72                 f"An error occurred while parsing the specification file "
73                 f"{self._data_spec_file,}\n"
74                 f"{err}"
75             )
76
77     @property
78     def data(self):
79         return self._data
80
81     @staticmethod
82     def _get_list_of_files(
83             path,
84             last_modified_begin=None,
85             last_modified_end=None,
86             days=None
87         ) -> list:
88         """Get list of interested files stored in S3 compatible storage and
89         returns it.
90
91         :param path: S3 prefix (accepts Unix shell-style wildcards)
92             (e.g. s3://bucket/prefix) or list of S3 objects paths
93             (e.g. [s3://bucket/key0, s3://bucket/key1]).
94         :param last_modified_begin: Filter the s3 files by the Last modified
95             date of the object. The filter is applied only after list all s3
96             files.
97         :param last_modified_end: Filter the s3 files by the Last modified date
98             of the object. The filter is applied only after list all s3 files.
99         :param days: Number of days to filter.
100         :type path: Union[str, List[str]]
101         :type last_modified_begin: datetime, optional
102         :type last_modified_end: datetime, optional
103         :type days: integer, optional
104         :returns: List of file names.
105         :rtype: list
106         """
107         file_list = list()
108         if days:
109             last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
110         try:
111             file_list = wr.s3.list_objects(
112                 path=path,
113                 suffix="parquet",
114                 last_modified_begin=last_modified_begin,
115                 last_modified_end=last_modified_end
116             )
117             logging.debug("\n".join(file_list))
118         except NoFilesFound as err:
119             logging.error(f"No parquets found.\n{err}")
120         except EmptyDataFrame as err:
121             logging.error(f"No data.\n{err}")
122
123         return file_list
124
125     def _validate_columns(self, data_type: str) -> str:
126         """Check if all columns are present in the dataframe.
127
128         :param data_type: The data type defined in data.yaml
129         :type data_type: str
130         :returns: Error message if validation fails, otherwise empty string.
131         :rtype: str
132         """
133         defined_columns = set()
134         for data_set in self._data_spec:
135             if data_set.get("data_type", str()) == data_type:
136                 defined_columns.update(data_set.get("columns", set()))
137
138         if not defined_columns:
139             return "No columns defined in the data set(s)."
140
141         if self.data[data_type].empty:
142             return "No data."
143
144         ret_msg = str()
145         for col in defined_columns:
146             if col not in self.data[data_type].columns:
147                 if not ret_msg:
148                     ret_msg = "Missing columns: "
149                 else:
150                     ret_msg += ", "
151                 ret_msg += f"{col}"
152         return ret_msg
153
154     @staticmethod
155     def _write_parquet_schema(
156             path,
157             partition_filter=None,
158             columns=None,
159             validate_schema=False,
160             last_modified_begin=None,
161             last_modified_end=None,
162             days=None
163         ) -> None:
164         """Auxiliary function to write parquet schemas. Use it instead of
165         "_create_dataframe_from_parquet" in "read_all_data".
166
167         :param path: S3 prefix (accepts Unix shell-style wildcards)
168             (e.g. s3://bucket/prefix) or list of S3 objects paths
169             (e.g. [s3://bucket/key0, s3://bucket/key1]).
170         :param partition_filter: Callback Function filters to apply on PARTITION
171             columns (PUSH-DOWN filter). This function MUST receive a single
172             argument (Dict[str, str]) where keys are partitions names and values
173             are partitions values. Partitions values will be always strings
174             extracted from S3. This function MUST return a bool, True to read
175             the partition or False to ignore it. Ignored if dataset=False.
176         :param columns: Names of columns to read from the file(s).
177         :param validate_schema: Check that individual file schemas are all the
178             same / compatible. Schemas within a folder prefix should all be the
179             same. Disable if you have schemas that are different and want to
180             disable this check.
181         :param last_modified_begin: Filter the s3 files by the Last modified
182             date of the object. The filter is applied only after list all s3
183             files.
184         :param last_modified_end: Filter the s3 files by the Last modified date
185             of the object. The filter is applied only after list all s3 files.
186         :param days: Number of days to filter.
187         :type path: Union[str, List[str]]
188         :type partition_filter: Callable[[Dict[str, str]], bool], optional
189         :type columns: List[str], optional
190         :type validate_schema: bool, optional
191         :type last_modified_begin: datetime, optional
192         :type last_modified_end: datetime, optional
193         :type days: integer, optional
194         """
195         if days:
196             last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
197
198         df = wr.s3.read_parquet(
199             path=path,
200             path_suffix="parquet",
201             ignore_empty=True,
202             validate_schema=validate_schema,
203             use_threads=True,
204             dataset=True,
205             columns=columns,
206             partition_filter=partition_filter,
207             last_modified_begin=last_modified_begin,
208             last_modified_end=last_modified_end,
209             chunked=1
210         )
211
212         for itm in df:
213             try:
214                 # Specify the condition or remove it:
215                 if pd.api.types.is_string_dtype(itm["result_rate_unit"]):
216                     print(pa.Schema.from_pandas(itm))
217                     pa.parquet.write_metadata(
218                         pa.Schema.from_pandas(itm),
219                         f"{C.PATH_TO_SCHEMAS}_tmp_schema"
220                     )
221                     print(itm)
222                     break
223             except KeyError:
224                 pass
225
226     @staticmethod
227     def _create_dataframe_from_parquet(
228             path,
229             partition_filter=None,
230             columns=None,
231             validate_schema=False,
232             last_modified_begin=None,
233             last_modified_end=None,
234             days=None,
235             schema=None
236         ) -> pd.DataFrame:
237         """Read parquet stored in S3 compatible storage and returns Pandas
238         Dataframe.
239
240         :param path: S3 prefix (accepts Unix shell-style wildcards)
241             (e.g. s3://bucket/prefix) or list of S3 objects paths
242             (e.g. [s3://bucket/key0, s3://bucket/key1]).
243         :param partition_filter: Callback Function filters to apply on PARTITION
244             columns (PUSH-DOWN filter). This function MUST receive a single
245             argument (Dict[str, str]) where keys are partitions names and values
246             are partitions values. Partitions values will be always strings
247             extracted from S3. This function MUST return a bool, True to read
248             the partition or False to ignore it. Ignored if dataset=False.
249         :param columns: Names of columns to read from the file(s).
250         :param validate_schema: Check that individual file schemas are all the
251             same / compatible. Schemas within a folder prefix should all be the
252             same. Disable if you have schemas that are different and want to
253             disable this check.
254         :param last_modified_begin: Filter the s3 files by the Last modified
255             date of the object. The filter is applied only after list all s3
256             files.
257         :param last_modified_end: Filter the s3 files by the Last modified date
258             of the object. The filter is applied only after list all s3 files.
259         :param days: Number of days to filter.
260         :param schema: Path to schema to use when reading data from the parquet.
261         :type path: Union[str, List[str]]
262         :type partition_filter: Callable[[Dict[str, str]], bool], optional
263         :type columns: List[str], optional
264         :type validate_schema: bool, optional
265         :type last_modified_begin: datetime, optional
266         :type last_modified_end: datetime, optional
267         :type days: integer, optional
268         :type schema: string
269         :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
270         :rtype: DataFrame
271         """
272         df = pd.DataFrame()
273         start = time()
274         if days:
275             last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
276         try:
277             df = wr.s3.read_parquet(
278                 path=path,
279                 path_suffix="parquet",
280                 ignore_empty=True,
281                 schema=schema,
282                 validate_schema=validate_schema,
283                 use_threads=True,
284                 dataset=True,
285                 columns=columns,
286                 partition_filter=partition_filter,
287                 last_modified_begin=last_modified_begin,
288                 last_modified_end=last_modified_end,
289                 dtype_backend="pyarrow"
290             )
291
292             df.info(verbose=True, memory_usage="deep")
293             logging.debug(
294                 f"\nCreation of dataframe {path} took: {time() - start}\n"
295             )
296         except (ArrowInvalid, ArrowNotImplementedError) as err:
297             logging.error(f"Reading of data from parquets FAILED.\n{repr(err)}")
298         except NoFilesFound as err:
299             logging.error(
300                 f"Reading of data from parquets FAILED.\n"
301                 f"No parquets found in specified time period.\n"
302                 f"Nr of days: {days}\n"
303                 f"last_modified_begin: {last_modified_begin}\n"
304                 f"{repr(err)}"
305             )
306         except EmptyDataFrame as err:
307             logging.error(
308                 f"Reading of data from parquets FAILED.\n"
309                 f"No data in parquets in specified time period.\n"
310                 f"Nr of days: {days}\n"
311                 f"last_modified_begin: {last_modified_begin}\n"
312                 f"{repr(err)}"
313             )
314
315         return df
316
317     def read_all_data(self, days: int=None) -> dict:
318         """Read all data necessary for all applications.
319
320         :param days: Number of days to filter. If None, all data will be
321             downloaded.
322         :type days: int
323         :returns: A dictionary where keys are names of parquets and values are
324             the pandas dataframes with fetched data.
325         :rtype: dict(str: pandas.DataFrame)
326         """
327
328         data_lists = {
329             "statistics": list(),
330             "trending": list(),
331             "iterative": list(),
332             "coverage": list()
333         }
334
335         logging.info("\n\nReading data:\n" + "-" * 13 + "\n")
336         for data_set in self._data_spec:
337             logging.info(
338                 f"\n\nReading data for {data_set['data_type']} "
339                 f"{data_set['partition_name']} {data_set.get('release', '')}\n"
340             )
341             schema_file = data_set.get("schema", None)
342             if schema_file:
343                 try:
344                     schema = pa.parquet.read_schema(
345                         f"{C.PATH_TO_SCHEMAS}{schema_file}"
346                     )
347                 except FileNotFoundError as err:
348                     logging.error(repr(err))
349                     logging.error("Proceeding without schema.")
350                     schema = None
351             else:
352                 schema = None
353             partition_filter = lambda part: True \
354                 if part[data_set["partition"]] == data_set["partition_name"] \
355                     else False
356             if data_set["data_type"] in ("trending", "statistics"):
357                 time_period = days
358             else:
359                 time_period = None
360             data = Data._create_dataframe_from_parquet(
361                 path=data_set["path"],
362                 partition_filter=partition_filter,
363                 columns=data_set.get("columns", None),
364                 days=time_period,
365                 schema=schema
366             )
367             if data_set["data_type"] in ("iterative", "coverage"):
368                 data["release"] = data_set["release"]
369                 data["release"] = data["release"].astype("category")
370
371             data_lists[data_set["data_type"]].append(data)
372
373         logging.info(
374             "\n\nData post-processing, validation and summary:\n" +
375             "-" * 45 + "\n"
376         )
377         for key in self._data.keys():
378             logging.info(f"\n\nDataframe {key}:\n")
379             self._data[key] = pd.concat(
380                 data_lists[key],
381                 ignore_index=True,
382                 copy=False
383             )    
384             self._data[key].info(verbose=True, memory_usage="deep")
385             err_msg = self._validate_columns(key)
386             if err_msg:
387                 self._data[key] = pd.DataFrame()
388                 logging.error(
389                     f"Data validation FAILED.\n"
390                     f"{err_msg}\n"
391                     "Generated dataframe replaced by an empty dataframe."
392                 )
393
394         mem_alloc = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000
395         logging.info(f"\n\nMemory allocation: {mem_alloc:.0f}MB\n")
396
397         return self._data