feat(uti): Add some more debug tools
[csit.git] / resources / tools / dash / app / pal / data / data.py
index 0956333..77fd113 100644 (file)
@@ -113,6 +113,48 @@ class Data:
                 f"specified.\n{err}"
             )
 
+    def _get_list_of_files(self,
+        path,
+        last_modified_begin=None,
+        last_modified_end=None,
+        days=None) -> list:
+        """Get list of interested files stored in S3 compatible storage and
+        returns it.
+
+        :param path: S3 prefix (accepts Unix shell-style wildcards)
+            (e.g. s3://bucket/prefix) or list of S3 objects paths
+            (e.g. [s3://bucket/key0, s3://bucket/key1]).
+        :param last_modified_begin: Filter the s3 files by the Last modified
+            date of the object. The filter is applied only after list all s3
+            files.
+        :param last_modified_end: Filter the s3 files by the Last modified date
+            of the object. The filter is applied only after list all s3 files.
+        :param days: Number of days to filter.
+        :type path: Union[str, List[str]]
+        :type last_modified_begin: datetime, optional
+        :type last_modified_end: datetime, optional
+        :type days: integer, optional
+        :returns: List of file names.
+        :rtype: List
+        """
+        if days:
+            last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
+        try:
+            file_list = wr.s3.list_objects(
+                path=path,
+                suffix="parquet",
+                last_modified_begin=last_modified_begin,
+                last_modified_end=last_modified_end
+            )
+            if self._debug:
+                logging.info("\n".join(file_list))
+        except NoFilesFound as err:
+            logging.error(f"No parquets found.\n{err}")
+        except EmptyDataFrame as err:
+            logging.error(f"No data.\n{err}")
+
+        return file_list
+
     def _create_dataframe_from_parquet(self,
         path, partition_filter=None,
         columns=None,
@@ -142,12 +184,14 @@ class Data:
             files.
         :param last_modified_end: Filter the s3 files by the Last modified date
             of the object. The filter is applied only after list all s3 files.
+        :param days: Number of days to filter.
         :type path: Union[str, List[str]]
         :type partition_filter: Callable[[Dict[str, str]], bool], optional
         :type columns: List[str], optional
         :type validate_schema: bool, optional
         :type last_modified_begin: datetime, optional
         :type last_modified_end: datetime, optional
+        :type days: integer, optional
         :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
         :rtype: DataFrame
         """
@@ -183,6 +227,16 @@ class Data:
         self._data = df
         return df
 
+    def check_datasets(self, days: int=None):
+        """Read structure from parquet.
+
+        :param days: Number of days back to the past for which the data will be
+            read.
+        :type days: int
+        """
+        self._get_list_of_files(path=self._get_path("trending"), days=days)
+        self._get_list_of_files(path=self._get_path("statistics"), days=days)
+
     def read_stats(self, days: int=None) -> tuple:
         """Read statistics from parquet.