X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Ftools%2Fdash%2Fapp%2Fpal%2Fdata%2Fdata.py;h=77fd113a9c705112e53cb2b73b15ef50b4577418;hp=0956333e34d0682be2a17b21d90de2224de742f6;hb=22a45eda880bee367ff8937d8e764cd41905a0cd;hpb=1ec5035f813e726b4998be28a4df81327606ca85 diff --git a/resources/tools/dash/app/pal/data/data.py b/resources/tools/dash/app/pal/data/data.py index 0956333e34..77fd113a9c 100644 --- a/resources/tools/dash/app/pal/data/data.py +++ b/resources/tools/dash/app/pal/data/data.py @@ -113,6 +113,48 @@ class Data: f"specified.\n{err}" ) + def _get_list_of_files(self, + path, + last_modified_begin=None, + last_modified_end=None, + days=None) -> list: + """Get list of interested files stored in S3 compatible storage and + returns it. + + :param path: S3 prefix (accepts Unix shell-style wildcards) + (e.g. s3://bucket/prefix) or list of S3 objects paths + (e.g. [s3://bucket/key0, s3://bucket/key1]). + :param last_modified_begin: Filter the s3 files by the Last modified + date of the object. The filter is applied only after list all s3 + files. + :param last_modified_end: Filter the s3 files by the Last modified date + of the object. The filter is applied only after list all s3 files. + :param days: Number of days to filter. + :type path: Union[str, List[str]] + :type last_modified_begin: datetime, optional + :type last_modified_end: datetime, optional + :type days: integer, optional + :returns: List of file names. + :rtype: List + """ + if days: + last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days) + try: + file_list = wr.s3.list_objects( + path=path, + suffix="parquet", + last_modified_begin=last_modified_begin, + last_modified_end=last_modified_end + ) + if self._debug: + logging.info("\n".join(file_list)) + except NoFilesFound as err: + logging.error(f"No parquets found.\n{err}") + except EmptyDataFrame as err: + logging.error(f"No data.\n{err}") + + return file_list + def _create_dataframe_from_parquet(self, path, partition_filter=None, columns=None, @@ -142,12 +184,14 @@ class Data: files. :param last_modified_end: Filter the s3 files by the Last modified date of the object. The filter is applied only after list all s3 files. + :param days: Number of days to filter. :type path: Union[str, List[str]] :type partition_filter: Callable[[Dict[str, str]], bool], optional :type columns: List[str], optional :type validate_schema: bool, optional :type last_modified_begin: datetime, optional :type last_modified_end: datetime, optional + :type days: integer, optional :returns: Pandas DataFrame or None if DataFrame cannot be fetched. :rtype: DataFrame """ @@ -183,6 +227,16 @@ class Data: self._data = df return df + def check_datasets(self, days: int=None): + """Read structure from parquet. + + :param days: Number of days back to the past for which the data will be + read. + :type days: int + """ + self._get_list_of_files(path=self._get_path("trending"), days=days) + self._get_list_of_files(path=self._get_path("statistics"), days=days) + def read_stats(self, days: int=None) -> tuple: """Read statistics from parquet.