C-Dash: Pre-load the data from parquets
[csit.git] / csit.infra.dash / app / cdash / data / data.py
index 7ddb443..8537cd8 100644 (file)
 """
 
 import logging
+import resource
 import awswrangler as wr
+import pandas as pd
 
 from yaml import load, FullLoader, YAMLError
 from datetime import datetime, timedelta
 from time import time
 from pytz import UTC
-from pandas import DataFrame
 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
 
 
@@ -30,27 +31,24 @@ class Data:
     applications.
     """
 
-    def __init__(self, data_spec_file: str, debug: bool=False) -> None:
+    def __init__(self, data_spec_file: str) -> None:
         """Initialize the Data object.
 
         :param data_spec_file: Path to file specifying the data to be read from
             parquets.
-        :param debug: If True, the debuf information is printed to stdout.
         :type data_spec_file: str
-        :type debug: bool
         :raises RuntimeError: if it is not possible to open data_spec_file or it
             is not a valid yaml file.
         """
 
         # Inputs:
         self._data_spec_file = data_spec_file
-        self._debug = debug
 
         # Specification of data to be read from parquets:
-        self._data_spec = None
+        self._data_spec = list()
 
         # Data frame to keep the data:
-        self._data = None
+        self._data = pd.DataFrame()
 
         # Read from files:
         try:
@@ -71,48 +69,6 @@ class Data:
     def data(self):
         return self._data
 
-    def _get_columns(self, parquet: str) -> list:
-        """Get the list of columns from the data specification file to be read
-        from parquets.
-
-        :param parquet: The parquet's name.
-        :type parquet: str
-        :raises RuntimeError: if the parquet is not defined in the data
-            specification file or it does not have any columns specified.
-        :returns: List of columns.
-        :rtype: list
-        """
-
-        try:
-            return self._data_spec[parquet]["columns"]
-        except KeyError as err:
-            raise RuntimeError(
-                f"The parquet {parquet} is not defined in the specification "
-                f"file {self._data_spec_file} or it does not have any columns "
-                f"specified.\n{err}"
-            )
-
-    def _get_path(self, parquet: str) -> str:
-        """Get the path from the data specification file to be read from
-        parquets.
-
-        :param parquet: The parquet's name.
-        :type parquet: str
-        :raises RuntimeError: if the parquet is not defined in the data
-            specification file or it does not have the path specified.
-        :returns: Path.
-        :rtype: str
-        """
-
-        try:
-            return self._data_spec[parquet]["path"]
-        except KeyError as err:
-            raise RuntimeError(
-                f"The parquet {parquet} is not defined in the specification "
-                f"file {self._data_spec_file} or it does not have the path "
-                f"specified.\n{err}"
-            )
-
     def _get_list_of_files(self,
         path,
         last_modified_begin=None,
@@ -147,8 +103,7 @@ class Data:
                 last_modified_begin=last_modified_begin,
                 last_modified_end=last_modified_end
             )
-            if self._debug:
-                logging.info("\n".join(file_list))
+            logging.debug("\n".join(file_list))
         except NoFilesFound as err:
             logging.error(f"No parquets found.\n{err}")
         except EmptyDataFrame as err:
@@ -156,13 +111,16 @@ class Data:
 
         return file_list
 
-    def _create_dataframe_from_parquet(self,
-        path, partition_filter=None,
-        columns=None,
-        validate_schema=False,
-        last_modified_begin=None,
-        last_modified_end=None,
-        days=None) -> DataFrame:
+    def _create_dataframe_from_parquet(
+            self,
+            path, partition_filter=None,
+            columns=None,
+            categories=list(),
+            validate_schema=False,
+            last_modified_begin=None,
+            last_modified_end=None,
+            days=None
+        ) -> pd.DataFrame:
         """Read parquet stored in S3 compatible storage and returns Pandas
         Dataframe.
 
@@ -176,6 +134,8 @@ class Data:
             extracted from S3. This function MUST return a bool, True to read
             the partition or False to ignore it. Ignored if dataset=False.
         :param columns: Names of columns to read from the file(s).
+        :param categories: List of columns names that should be returned as
+            pandas.Categorical.
         :param validate_schema: Check that individual file schemas are all the
             same / compatible. Schemas within a folder prefix should all be the
             same. Disable if you have schemas that are different and want to
@@ -189,6 +149,7 @@ class Data:
         :type path: Union[str, List[str]]
         :type partition_filter: Callable[[Dict[str, str]], bool], optional
         :type columns: List[str], optional
+        :type categories: List[str], optional
         :type validate_schema: bool, optional
         :type last_modified_begin: datetime, optional
         :type last_modified_end: datetime, optional
@@ -209,142 +170,89 @@ class Data:
                 use_threads=True,
                 dataset=True,
                 columns=columns,
+                # categories=categories,
                 partition_filter=partition_filter,
                 last_modified_begin=last_modified_begin,
                 last_modified_end=last_modified_end
             )
-            if self._debug:
-                df.info(verbose=True, memory_usage='deep')
-                logging.info(
-                    f"\nCreation of dataframe {path} took: {time() - start}\n"
-                )
+            df.info(verbose=True, memory_usage="deep")
+            logging.debug(
+                f"\nCreation of dataframe {path} took: {time() - start}\n"
+            )
         except NoFilesFound as err:
             logging.error(f"No parquets found.\n{err}")
         except EmptyDataFrame as err:
             logging.error(f"No data.\n{err}")
 
-        self._data = df
         return df
 
-    def check_datasets(self, days: int=None):
-        """Read structure from parquet.
+    def read_all_data(self, days: int=None) -> dict:
+        """Read all data necessary for all applications.
 
-        :param days: Number of days back to the past for which the data will be
-            read.
+        :param days: Number of days to filter. If None, all data will be
+            downloaded.
         :type days: int
+        :returns: A dictionary where keys are names of parquets and values are
+            the pandas dataframes with fetched data.
+        :rtype: dict(str: pandas.DataFrame)
         """
-        self._get_list_of_files(path=self._get_path("trending"), days=days)
-        self._get_list_of_files(path=self._get_path("statistics"), days=days)
-
-    def read_stats(self, days: int=None) -> tuple:
-        """Read statistics from parquet.
 
-        It reads from:
-        - Suite Result Analysis (SRA) partition,
-        - NDRPDR trending partition,
-        - MRR trending partition.
+        self._data = dict()
+        self._data["trending"] = pd.DataFrame()
+        self._data["iterative"] = pd.DataFrame()
+        lst_trending = list()
+        lst_iterative = list()
 
-        :param days: Number of days back to the past for which the data will be
-            read.
-        :type days: int
-        :returns: tuple of pandas DataFrame-s with data read from specified
-            parquets.
-        :rtype: tuple of pandas DataFrame-s
-        """
-
-        l_stats = lambda part: True if part["stats_type"] == "sra" else False
-        l_mrr = lambda part: True if part["test_type"] == "mrr" else False
-        l_ndrpdr = lambda part: True if part["test_type"] == "ndrpdr" else False
-
-        return (
-            self._create_dataframe_from_parquet(
-                path=self._get_path("statistics"),
-                partition_filter=l_stats,
-                columns=self._get_columns("statistics"),
-                days=days
-            ),
-            self._create_dataframe_from_parquet(
-                path=self._get_path("statistics-trending-mrr"),
-                partition_filter=l_mrr,
-                columns=self._get_columns("statistics-trending-mrr"),
-                days=days
-            ),
-            self._create_dataframe_from_parquet(
-                path=self._get_path("statistics-trending-ndrpdr"),
-                partition_filter=l_ndrpdr,
-                columns=self._get_columns("statistics-trending-ndrpdr"),
-                days=days
+        for data_set in self._data_spec:
+            logging.info(
+                f"Reading data for {data_set['data_type']} "
+                f"{data_set['partition_name']} {data_set.get('release', '')}"
             )
-        )
-
-    def read_trending_mrr(self, days: int=None) -> DataFrame:
-        """Read MRR data partition from parquet.
+            partition_filter = lambda part: True \
+                if part[data_set["partition"]] == data_set["partition_name"] \
+                    else False
 
-        :param days: Number of days back to the past for which the data will be
-            read.
-        :type days: int
-        :returns: Pandas DataFrame with read data.
-        :rtype: DataFrame
-        """
-
-        lambda_f = lambda part: True if part["test_type"] == "mrr" else False
-
-        return self._create_dataframe_from_parquet(
-            path=self._get_path("trending-mrr"),
-            partition_filter=lambda_f,
-            columns=self._get_columns("trending-mrr"),
-            days=days
-        )
-
-    def read_trending_ndrpdr(self, days: int=None) -> DataFrame:
-        """Read NDRPDR data partition from iterative parquet.
-
-        :param days: Number of days back to the past for which the data will be
-            read.
-        :type days: int
-        :returns: Pandas DataFrame with read data.
-        :rtype: DataFrame
-        """
+            data = self._create_dataframe_from_parquet(
+                path=data_set["path"],
+                partition_filter=partition_filter,
+                columns=data_set.get("columns", list()),
+                categories=data_set.get("categories", list()),
+                days=None if data_set["data_type"] == "iterative" else days
+            )
 
-        lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
+            if data_set["data_type"] == "statistics":
+                self._data["statistics"] = data
+            elif data_set["data_type"] == "trending":
+                lst_trending.append(data)
+            elif data_set["data_type"] == "iterative":
+                data["release"] = data_set["release"]
+                data["release"] = data["release"].astype("category")
+                lst_iterative.append(data)
+            else:
+                raise NotImplementedError(
+                    f"The data type {data_set['data_type']} is not implemented."
+                )
 
-        return self._create_dataframe_from_parquet(
-            path=self._get_path("trending-ndrpdr"),
-            partition_filter=lambda_f,
-            columns=self._get_columns("trending-ndrpdr"),
-            days=days
+        self._data["iterative"] = pd.concat(
+            lst_iterative,
+            ignore_index=True,
+            copy=False
         )
-
-    def read_iterative_mrr(self, release: str) -> DataFrame:
-        """Read MRR data partition from iterative parquet.
-
-        :param release: The CSIT release from which the data will be read.
-        :type release: str
-        :returns: Pandas DataFrame with read data.
-        :rtype: DataFrame
-        """
-
-        lambda_f = lambda part: True if part["test_type"] == "mrr" else False
-
-        return self._create_dataframe_from_parquet(
-            path=self._get_path("iterative-mrr").format(release=release),
-            partition_filter=lambda_f,
-            columns=self._get_columns("iterative-mrr")
+        self._data["trending"] = pd.concat(
+            lst_trending,
+            ignore_index=True,
+            copy=False
         )
 
-    def read_iterative_ndrpdr(self, release: str) -> DataFrame:
-        """Read NDRPDR data partition from parquet.
-
-        :param release: The CSIT release from which the data will be read.
-        :type release: str
-        :returns: Pandas DataFrame with read data.
-        :rtype: DataFrame
-        """
+        for key in self._data.keys():
+            logging.info(
+                f"\nData frame {key}:"
+                f"\n{self._data[key].memory_usage(deep=True)}\n"
+            )
+            self._data[key].info(verbose=True, memory_usage="deep")
 
-        lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
+        mem_alloc = \
+            resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000
+        logging.info(f"Memory allocation: {mem_alloc:.0f}MB")
 
-        return self._create_dataframe_from_parquet(
-            path=self._get_path("iterative-ndrpdr").format(release=release),
-            partition_filter=lambda_f,
-            columns=self._get_columns("iterative-ndrpdr")
-        )
+        return self._data