C-Dash: Pre-load the data from parquets
[csit.git] / csit.infra.dash / app / cdash / data / data.py
index 7ddb443..8537cd8 100644 (file)
 """
 
 import logging
 """
 
 import logging
+import resource
 import awswrangler as wr
 import awswrangler as wr
+import pandas as pd
 
 from yaml import load, FullLoader, YAMLError
 from datetime import datetime, timedelta
 from time import time
 from pytz import UTC
 
 from yaml import load, FullLoader, YAMLError
 from datetime import datetime, timedelta
 from time import time
 from pytz import UTC
-from pandas import DataFrame
 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
 
 
 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
 
 
@@ -30,27 +31,24 @@ class Data:
     applications.
     """
 
     applications.
     """
 
-    def __init__(self, data_spec_file: str, debug: bool=False) -> None:
+    def __init__(self, data_spec_file: str) -> None:
         """Initialize the Data object.
 
         :param data_spec_file: Path to file specifying the data to be read from
             parquets.
         """Initialize the Data object.
 
         :param data_spec_file: Path to file specifying the data to be read from
             parquets.
-        :param debug: If True, the debuf information is printed to stdout.
         :type data_spec_file: str
         :type data_spec_file: str
-        :type debug: bool
         :raises RuntimeError: if it is not possible to open data_spec_file or it
             is not a valid yaml file.
         """
 
         # Inputs:
         self._data_spec_file = data_spec_file
         :raises RuntimeError: if it is not possible to open data_spec_file or it
             is not a valid yaml file.
         """
 
         # Inputs:
         self._data_spec_file = data_spec_file
-        self._debug = debug
 
         # Specification of data to be read from parquets:
 
         # Specification of data to be read from parquets:
-        self._data_spec = None
+        self._data_spec = list()
 
         # Data frame to keep the data:
 
         # Data frame to keep the data:
-        self._data = None
+        self._data = pd.DataFrame()
 
         # Read from files:
         try:
 
         # Read from files:
         try:
@@ -71,48 +69,6 @@ class Data:
     def data(self):
         return self._data
 
     def data(self):
         return self._data
 
-    def _get_columns(self, parquet: str) -> list:
-        """Get the list of columns from the data specification file to be read
-        from parquets.
-
-        :param parquet: The parquet's name.
-        :type parquet: str
-        :raises RuntimeError: if the parquet is not defined in the data
-            specification file or it does not have any columns specified.
-        :returns: List of columns.
-        :rtype: list
-        """
-
-        try:
-            return self._data_spec[parquet]["columns"]
-        except KeyError as err:
-            raise RuntimeError(
-                f"The parquet {parquet} is not defined in the specification "
-                f"file {self._data_spec_file} or it does not have any columns "
-                f"specified.\n{err}"
-            )
-
-    def _get_path(self, parquet: str) -> str:
-        """Get the path from the data specification file to be read from
-        parquets.
-
-        :param parquet: The parquet's name.
-        :type parquet: str
-        :raises RuntimeError: if the parquet is not defined in the data
-            specification file or it does not have the path specified.
-        :returns: Path.
-        :rtype: str
-        """
-
-        try:
-            return self._data_spec[parquet]["path"]
-        except KeyError as err:
-            raise RuntimeError(
-                f"The parquet {parquet} is not defined in the specification "
-                f"file {self._data_spec_file} or it does not have the path "
-                f"specified.\n{err}"
-            )
-
     def _get_list_of_files(self,
         path,
         last_modified_begin=None,
     def _get_list_of_files(self,
         path,
         last_modified_begin=None,
@@ -147,8 +103,7 @@ class Data:
                 last_modified_begin=last_modified_begin,
                 last_modified_end=last_modified_end
             )
                 last_modified_begin=last_modified_begin,
                 last_modified_end=last_modified_end
             )
-            if self._debug:
-                logging.info("\n".join(file_list))
+            logging.debug("\n".join(file_list))
         except NoFilesFound as err:
             logging.error(f"No parquets found.\n{err}")
         except EmptyDataFrame as err:
         except NoFilesFound as err:
             logging.error(f"No parquets found.\n{err}")
         except EmptyDataFrame as err:
@@ -156,13 +111,16 @@ class Data:
 
         return file_list
 
 
         return file_list
 
-    def _create_dataframe_from_parquet(self,
-        path, partition_filter=None,
-        columns=None,
-        validate_schema=False,
-        last_modified_begin=None,
-        last_modified_end=None,
-        days=None) -> DataFrame:
+    def _create_dataframe_from_parquet(
+            self,
+            path, partition_filter=None,
+            columns=None,
+            categories=list(),
+            validate_schema=False,
+            last_modified_begin=None,
+            last_modified_end=None,
+            days=None
+        ) -> pd.DataFrame:
         """Read parquet stored in S3 compatible storage and returns Pandas
         Dataframe.
 
         """Read parquet stored in S3 compatible storage and returns Pandas
         Dataframe.
 
@@ -176,6 +134,8 @@ class Data:
             extracted from S3. This function MUST return a bool, True to read
             the partition or False to ignore it. Ignored if dataset=False.
         :param columns: Names of columns to read from the file(s).
             extracted from S3. This function MUST return a bool, True to read
             the partition or False to ignore it. Ignored if dataset=False.
         :param columns: Names of columns to read from the file(s).
+        :param categories: List of columns names that should be returned as
+            pandas.Categorical.
         :param validate_schema: Check that individual file schemas are all the
             same / compatible. Schemas within a folder prefix should all be the
             same. Disable if you have schemas that are different and want to
         :param validate_schema: Check that individual file schemas are all the
             same / compatible. Schemas within a folder prefix should all be the
             same. Disable if you have schemas that are different and want to
@@ -189,6 +149,7 @@ class Data:
         :type path: Union[str, List[str]]
         :type partition_filter: Callable[[Dict[str, str]], bool], optional
         :type columns: List[str], optional
         :type path: Union[str, List[str]]
         :type partition_filter: Callable[[Dict[str, str]], bool], optional
         :type columns: List[str], optional
+        :type categories: List[str], optional
         :type validate_schema: bool, optional
         :type last_modified_begin: datetime, optional
         :type last_modified_end: datetime, optional
         :type validate_schema: bool, optional
         :type last_modified_begin: datetime, optional
         :type last_modified_end: datetime, optional
@@ -209,142 +170,89 @@ class Data:
                 use_threads=True,
                 dataset=True,
                 columns=columns,
                 use_threads=True,
                 dataset=True,
                 columns=columns,
+                # categories=categories,
                 partition_filter=partition_filter,
                 last_modified_begin=last_modified_begin,
                 last_modified_end=last_modified_end
             )
                 partition_filter=partition_filter,
                 last_modified_begin=last_modified_begin,
                 last_modified_end=last_modified_end
             )
-            if self._debug:
-                df.info(verbose=True, memory_usage='deep')
-                logging.info(
-                    f"\nCreation of dataframe {path} took: {time() - start}\n"
-                )
+            df.info(verbose=True, memory_usage="deep")
+            logging.debug(
+                f"\nCreation of dataframe {path} took: {time() - start}\n"
+            )
         except NoFilesFound as err:
             logging.error(f"No parquets found.\n{err}")
         except EmptyDataFrame as err:
             logging.error(f"No data.\n{err}")
 
         except NoFilesFound as err:
             logging.error(f"No parquets found.\n{err}")
         except EmptyDataFrame as err:
             logging.error(f"No data.\n{err}")
 
-        self._data = df
         return df
 
         return df
 
-    def check_datasets(self, days: int=None):
-        """Read structure from parquet.
+    def read_all_data(self, days: int=None) -> dict:
+        """Read all data necessary for all applications.
 
 
-        :param days: Number of days back to the past for which the data will be
-            read.
+        :param days: Number of days to filter. If None, all data will be
+            downloaded.
         :type days: int
         :type days: int
+        :returns: A dictionary where keys are names of parquets and values are
+            the pandas dataframes with fetched data.
+        :rtype: dict(str: pandas.DataFrame)
         """
         """
-        self._get_list_of_files(path=self._get_path("trending"), days=days)
-        self._get_list_of_files(path=self._get_path("statistics"), days=days)
-
-    def read_stats(self, days: int=None) -> tuple:
-        """Read statistics from parquet.
 
 
-        It reads from:
-        - Suite Result Analysis (SRA) partition,
-        - NDRPDR trending partition,
-        - MRR trending partition.
+        self._data = dict()
+        self._data["trending"] = pd.DataFrame()
+        self._data["iterative"] = pd.DataFrame()
+        lst_trending = list()
+        lst_iterative = list()
 
 
-        :param days: Number of days back to the past for which the data will be
-            read.
-        :type days: int
-        :returns: tuple of pandas DataFrame-s with data read from specified
-            parquets.
-        :rtype: tuple of pandas DataFrame-s
-        """
-
-        l_stats = lambda part: True if part["stats_type"] == "sra" else False
-        l_mrr = lambda part: True if part["test_type"] == "mrr" else False
-        l_ndrpdr = lambda part: True if part["test_type"] == "ndrpdr" else False
-
-        return (
-            self._create_dataframe_from_parquet(
-                path=self._get_path("statistics"),
-                partition_filter=l_stats,
-                columns=self._get_columns("statistics"),
-                days=days
-            ),
-            self._create_dataframe_from_parquet(
-                path=self._get_path("statistics-trending-mrr"),
-                partition_filter=l_mrr,
-                columns=self._get_columns("statistics-trending-mrr"),
-                days=days
-            ),
-            self._create_dataframe_from_parquet(
-                path=self._get_path("statistics-trending-ndrpdr"),
-                partition_filter=l_ndrpdr,
-                columns=self._get_columns("statistics-trending-ndrpdr"),
-                days=days
+        for data_set in self._data_spec:
+            logging.info(
+                f"Reading data for {data_set['data_type']} "
+                f"{data_set['partition_name']} {data_set.get('release', '')}"
             )
             )
-        )
-
-    def read_trending_mrr(self, days: int=None) -> DataFrame:
-        """Read MRR data partition from parquet.
+            partition_filter = lambda part: True \
+                if part[data_set["partition"]] == data_set["partition_name"] \
+                    else False
 
 
-        :param days: Number of days back to the past for which the data will be
-            read.
-        :type days: int
-        :returns: Pandas DataFrame with read data.
-        :rtype: DataFrame
-        """
-
-        lambda_f = lambda part: True if part["test_type"] == "mrr" else False
-
-        return self._create_dataframe_from_parquet(
-            path=self._get_path("trending-mrr"),
-            partition_filter=lambda_f,
-            columns=self._get_columns("trending-mrr"),
-            days=days
-        )
-
-    def read_trending_ndrpdr(self, days: int=None) -> DataFrame:
-        """Read NDRPDR data partition from iterative parquet.
-
-        :param days: Number of days back to the past for which the data will be
-            read.
-        :type days: int
-        :returns: Pandas DataFrame with read data.
-        :rtype: DataFrame
-        """
+            data = self._create_dataframe_from_parquet(
+                path=data_set["path"],
+                partition_filter=partition_filter,
+                columns=data_set.get("columns", list()),
+                categories=data_set.get("categories", list()),
+                days=None if data_set["data_type"] == "iterative" else days
+            )
 
 
-        lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
+            if data_set["data_type"] == "statistics":
+                self._data["statistics"] = data
+            elif data_set["data_type"] == "trending":
+                lst_trending.append(data)
+            elif data_set["data_type"] == "iterative":
+                data["release"] = data_set["release"]
+                data["release"] = data["release"].astype("category")
+                lst_iterative.append(data)
+            else:
+                raise NotImplementedError(
+                    f"The data type {data_set['data_type']} is not implemented."
+                )
 
 
-        return self._create_dataframe_from_parquet(
-            path=self._get_path("trending-ndrpdr"),
-            partition_filter=lambda_f,
-            columns=self._get_columns("trending-ndrpdr"),
-            days=days
+        self._data["iterative"] = pd.concat(
+            lst_iterative,
+            ignore_index=True,
+            copy=False
         )
         )
-
-    def read_iterative_mrr(self, release: str) -> DataFrame:
-        """Read MRR data partition from iterative parquet.
-
-        :param release: The CSIT release from which the data will be read.
-        :type release: str
-        :returns: Pandas DataFrame with read data.
-        :rtype: DataFrame
-        """
-
-        lambda_f = lambda part: True if part["test_type"] == "mrr" else False
-
-        return self._create_dataframe_from_parquet(
-            path=self._get_path("iterative-mrr").format(release=release),
-            partition_filter=lambda_f,
-            columns=self._get_columns("iterative-mrr")
+        self._data["trending"] = pd.concat(
+            lst_trending,
+            ignore_index=True,
+            copy=False
         )
 
         )
 
-    def read_iterative_ndrpdr(self, release: str) -> DataFrame:
-        """Read NDRPDR data partition from parquet.
-
-        :param release: The CSIT release from which the data will be read.
-        :type release: str
-        :returns: Pandas DataFrame with read data.
-        :rtype: DataFrame
-        """
+        for key in self._data.keys():
+            logging.info(
+                f"\nData frame {key}:"
+                f"\n{self._data[key].memory_usage(deep=True)}\n"
+            )
+            self._data[key].info(verbose=True, memory_usage="deep")
 
 
-        lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
+        mem_alloc = \
+            resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000
+        logging.info(f"Memory allocation: {mem_alloc:.0f}MB")
 
 
-        return self._create_dataframe_from_parquet(
-            path=self._get_path("iterative-ndrpdr").format(release=release),
-            partition_filter=lambda_f,
-            columns=self._get_columns("iterative-ndrpdr")
-        )
+        return self._data