X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=resources%2Ftools%2Fdash%2Fapp%2Fpal%2Fdata%2Fdata.py;h=77fd113a9c705112e53cb2b73b15ef50b4577418;hb=d2ef7bc01df66f6a27f25d061db064cf4a463267;hp=296db024c0f7dffe59fd2feed743d7068d8ec41a;hpb=371bac71bc789bf9d68fa1b8ba77f21c4876244f;p=csit.git diff --git a/resources/tools/dash/app/pal/data/data.py b/resources/tools/dash/app/pal/data/data.py index 296db024c0..77fd113a9c 100644 --- a/resources/tools/dash/app/pal/data/data.py +++ b/resources/tools/dash/app/pal/data/data.py @@ -15,15 +15,13 @@ """ import logging +import awswrangler as wr from yaml import load, FullLoader, YAMLError from datetime import datetime, timedelta from time import time from pytz import UTC from pandas import DataFrame - -import awswrangler as wr - from awswrangler.exceptions import EmptyDataFrame, NoFilesFound @@ -115,6 +113,48 @@ class Data: f"specified.\n{err}" ) + def _get_list_of_files(self, + path, + last_modified_begin=None, + last_modified_end=None, + days=None) -> list: + """Get list of interested files stored in S3 compatible storage and + returns it. + + :param path: S3 prefix (accepts Unix shell-style wildcards) + (e.g. s3://bucket/prefix) or list of S3 objects paths + (e.g. [s3://bucket/key0, s3://bucket/key1]). + :param last_modified_begin: Filter the s3 files by the Last modified + date of the object. The filter is applied only after list all s3 + files. + :param last_modified_end: Filter the s3 files by the Last modified date + of the object. The filter is applied only after list all s3 files. + :param days: Number of days to filter. + :type path: Union[str, List[str]] + :type last_modified_begin: datetime, optional + :type last_modified_end: datetime, optional + :type days: integer, optional + :returns: List of file names. + :rtype: List + """ + if days: + last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days) + try: + file_list = wr.s3.list_objects( + path=path, + suffix="parquet", + last_modified_begin=last_modified_begin, + last_modified_end=last_modified_end + ) + if self._debug: + logging.info("\n".join(file_list)) + except NoFilesFound as err: + logging.error(f"No parquets found.\n{err}") + except EmptyDataFrame as err: + logging.error(f"No data.\n{err}") + + return file_list + def _create_dataframe_from_parquet(self, path, partition_filter=None, columns=None, @@ -144,12 +184,14 @@ class Data: files. :param last_modified_end: Filter the s3 files by the Last modified date of the object. The filter is applied only after list all s3 files. + :param days: Number of days to filter. :type path: Union[str, List[str]] :type partition_filter: Callable[[Dict[str, str]], bool], optional :type columns: List[str], optional :type validate_schema: bool, optional :type last_modified_begin: datetime, optional :type last_modified_end: datetime, optional + :type days: integer, optional :returns: Pandas DataFrame or None if DataFrame cannot be fetched. :rtype: DataFrame """ @@ -185,6 +227,16 @@ class Data: self._data = df return df + def check_datasets(self, days: int=None): + """Read structure from parquet. + + :param days: Number of days back to the past for which the data will be + read. + :type days: int + """ + self._get_list_of_files(path=self._get_path("trending"), days=days) + self._get_list_of_files(path=self._get_path("statistics"), days=days) + def read_stats(self, days: int=None) -> tuple: """Read statistics from parquet.