8537cd8db1285092255bd7106532f2e9a0d81a7b
[csit.git] / csit.infra.dash / app / cdash / data / data.py
1 # Copyright (c) 2023 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """Prepare data for Plotly Dash applications.
15 """
16
17 import logging
18 import resource
19 import awswrangler as wr
20 import pandas as pd
21
22 from yaml import load, FullLoader, YAMLError
23 from datetime import datetime, timedelta
24 from time import time
25 from pytz import UTC
26 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
27
28
29 class Data:
30     """Gets the data from parquets and stores it for further use by dash
31     applications.
32     """
33
34     def __init__(self, data_spec_file: str) -> None:
35         """Initialize the Data object.
36
37         :param data_spec_file: Path to file specifying the data to be read from
38             parquets.
39         :type data_spec_file: str
40         :raises RuntimeError: if it is not possible to open data_spec_file or it
41             is not a valid yaml file.
42         """
43
44         # Inputs:
45         self._data_spec_file = data_spec_file
46
47         # Specification of data to be read from parquets:
48         self._data_spec = list()
49
50         # Data frame to keep the data:
51         self._data = pd.DataFrame()
52
53         # Read from files:
54         try:
55             with open(self._data_spec_file, "r") as file_read:
56                 self._data_spec = load(file_read, Loader=FullLoader)
57         except IOError as err:
58             raise RuntimeError(
59                 f"Not possible to open the file {self._data_spec_file,}\n{err}"
60             )
61         except YAMLError as err:
62             raise RuntimeError(
63                 f"An error occurred while parsing the specification file "
64                 f"{self._data_spec_file,}\n"
65                 f"{err}"
66             )
67
68     @property
69     def data(self):
70         return self._data
71
72     def _get_list_of_files(self,
73         path,
74         last_modified_begin=None,
75         last_modified_end=None,
76         days=None) -> list:
77         """Get list of interested files stored in S3 compatible storage and
78         returns it.
79
80         :param path: S3 prefix (accepts Unix shell-style wildcards)
81             (e.g. s3://bucket/prefix) or list of S3 objects paths
82             (e.g. [s3://bucket/key0, s3://bucket/key1]).
83         :param last_modified_begin: Filter the s3 files by the Last modified
84             date of the object. The filter is applied only after list all s3
85             files.
86         :param last_modified_end: Filter the s3 files by the Last modified date
87             of the object. The filter is applied only after list all s3 files.
88         :param days: Number of days to filter.
89         :type path: Union[str, List[str]]
90         :type last_modified_begin: datetime, optional
91         :type last_modified_end: datetime, optional
92         :type days: integer, optional
93         :returns: List of file names.
94         :rtype: list
95         """
96         file_list = list()
97         if days:
98             last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
99         try:
100             file_list = wr.s3.list_objects(
101                 path=path,
102                 suffix="parquet",
103                 last_modified_begin=last_modified_begin,
104                 last_modified_end=last_modified_end
105             )
106             logging.debug("\n".join(file_list))
107         except NoFilesFound as err:
108             logging.error(f"No parquets found.\n{err}")
109         except EmptyDataFrame as err:
110             logging.error(f"No data.\n{err}")
111
112         return file_list
113
114     def _create_dataframe_from_parquet(
115             self,
116             path, partition_filter=None,
117             columns=None,
118             categories=list(),
119             validate_schema=False,
120             last_modified_begin=None,
121             last_modified_end=None,
122             days=None
123         ) -> pd.DataFrame:
124         """Read parquet stored in S3 compatible storage and returns Pandas
125         Dataframe.
126
127         :param path: S3 prefix (accepts Unix shell-style wildcards)
128             (e.g. s3://bucket/prefix) or list of S3 objects paths
129             (e.g. [s3://bucket/key0, s3://bucket/key1]).
130         :param partition_filter: Callback Function filters to apply on PARTITION
131             columns (PUSH-DOWN filter). This function MUST receive a single
132             argument (Dict[str, str]) where keys are partitions names and values
133             are partitions values. Partitions values will be always strings
134             extracted from S3. This function MUST return a bool, True to read
135             the partition or False to ignore it. Ignored if dataset=False.
136         :param columns: Names of columns to read from the file(s).
137         :param categories: List of columns names that should be returned as
138             pandas.Categorical.
139         :param validate_schema: Check that individual file schemas are all the
140             same / compatible. Schemas within a folder prefix should all be the
141             same. Disable if you have schemas that are different and want to
142             disable this check.
143         :param last_modified_begin: Filter the s3 files by the Last modified
144             date of the object. The filter is applied only after list all s3
145             files.
146         :param last_modified_end: Filter the s3 files by the Last modified date
147             of the object. The filter is applied only after list all s3 files.
148         :param days: Number of days to filter.
149         :type path: Union[str, List[str]]
150         :type partition_filter: Callable[[Dict[str, str]], bool], optional
151         :type columns: List[str], optional
152         :type categories: List[str], optional
153         :type validate_schema: bool, optional
154         :type last_modified_begin: datetime, optional
155         :type last_modified_end: datetime, optional
156         :type days: integer, optional
157         :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
158         :rtype: DataFrame
159         """
160         df = None
161         start = time()
162         if days:
163             last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
164         try:
165             df = wr.s3.read_parquet(
166                 path=path,
167                 path_suffix="parquet",
168                 ignore_empty=True,
169                 validate_schema=validate_schema,
170                 use_threads=True,
171                 dataset=True,
172                 columns=columns,
173                 # categories=categories,
174                 partition_filter=partition_filter,
175                 last_modified_begin=last_modified_begin,
176                 last_modified_end=last_modified_end
177             )
178             df.info(verbose=True, memory_usage="deep")
179             logging.debug(
180                 f"\nCreation of dataframe {path} took: {time() - start}\n"
181             )
182         except NoFilesFound as err:
183             logging.error(f"No parquets found.\n{err}")
184         except EmptyDataFrame as err:
185             logging.error(f"No data.\n{err}")
186
187         return df
188
189     def read_all_data(self, days: int=None) -> dict:
190         """Read all data necessary for all applications.
191
192         :param days: Number of days to filter. If None, all data will be
193             downloaded.
194         :type days: int
195         :returns: A dictionary where keys are names of parquets and values are
196             the pandas dataframes with fetched data.
197         :rtype: dict(str: pandas.DataFrame)
198         """
199
200         self._data = dict()
201         self._data["trending"] = pd.DataFrame()
202         self._data["iterative"] = pd.DataFrame()
203         lst_trending = list()
204         lst_iterative = list()
205
206         for data_set in self._data_spec:
207             logging.info(
208                 f"Reading data for {data_set['data_type']} "
209                 f"{data_set['partition_name']} {data_set.get('release', '')}"
210             )
211             partition_filter = lambda part: True \
212                 if part[data_set["partition"]] == data_set["partition_name"] \
213                     else False
214
215             data = self._create_dataframe_from_parquet(
216                 path=data_set["path"],
217                 partition_filter=partition_filter,
218                 columns=data_set.get("columns", list()),
219                 categories=data_set.get("categories", list()),
220                 days=None if data_set["data_type"] == "iterative" else days
221             )
222
223             if data_set["data_type"] == "statistics":
224                 self._data["statistics"] = data
225             elif data_set["data_type"] == "trending":
226                 lst_trending.append(data)
227             elif data_set["data_type"] == "iterative":
228                 data["release"] = data_set["release"]
229                 data["release"] = data["release"].astype("category")
230                 lst_iterative.append(data)
231             else:
232                 raise NotImplementedError(
233                     f"The data type {data_set['data_type']} is not implemented."
234                 )
235
236         self._data["iterative"] = pd.concat(
237             lst_iterative,
238             ignore_index=True,
239             copy=False
240         )
241         self._data["trending"] = pd.concat(
242             lst_trending,
243             ignore_index=True,
244             copy=False
245         )
246
247         for key in self._data.keys():
248             logging.info(
249                 f"\nData frame {key}:"
250                 f"\n{self._data[key].memory_usage(deep=True)}\n"
251             )
252             self._data[key].info(verbose=True, memory_usage="deep")
253
254         mem_alloc = \
255             resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000
256         logging.info(f"Memory allocation: {mem_alloc:.0f}MB")
257
258         return self._data