C-Dash: Add VPP Device coverage data
[csit.git] / csit.infra.dash / app / cdash / data / data.py
index 7ddb443..a0d698e 100644 (file)
 """
 
 import logging
+import resource
 import awswrangler as wr
+import pandas as pd
 
 from yaml import load, FullLoader, YAMLError
 from datetime import datetime, timedelta
 from time import time
 from pytz import UTC
-from pandas import DataFrame
 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
 
 
@@ -30,27 +31,29 @@ class Data:
     applications.
     """
 
-    def __init__(self, data_spec_file: str, debug: bool=False) -> None:
+    def __init__(self, data_spec_file: str) -> None:
         """Initialize the Data object.
 
         :param data_spec_file: Path to file specifying the data to be read from
             parquets.
-        :param debug: If True, the debuf information is printed to stdout.
         :type data_spec_file: str
-        :type debug: bool
         :raises RuntimeError: if it is not possible to open data_spec_file or it
             is not a valid yaml file.
         """
 
         # Inputs:
         self._data_spec_file = data_spec_file
-        self._debug = debug
 
         # Specification of data to be read from parquets:
-        self._data_spec = None
+        self._data_spec = list()
 
         # Data frame to keep the data:
-        self._data = None
+        self._data = {
+            "statistics": pd.DataFrame(),
+            "trending": pd.DataFrame(),
+            "iterative": pd.DataFrame(),
+            "coverage": pd.DataFrame()
+        }
 
         # Read from files:
         try:
@@ -71,53 +74,13 @@ class Data:
     def data(self):
         return self._data
 
-    def _get_columns(self, parquet: str) -> list:
-        """Get the list of columns from the data specification file to be read
-        from parquets.
-
-        :param parquet: The parquet's name.
-        :type parquet: str
-        :raises RuntimeError: if the parquet is not defined in the data
-            specification file or it does not have any columns specified.
-        :returns: List of columns.
-        :rtype: list
-        """
-
-        try:
-            return self._data_spec[parquet]["columns"]
-        except KeyError as err:
-            raise RuntimeError(
-                f"The parquet {parquet} is not defined in the specification "
-                f"file {self._data_spec_file} or it does not have any columns "
-                f"specified.\n{err}"
-            )
-
-    def _get_path(self, parquet: str) -> str:
-        """Get the path from the data specification file to be read from
-        parquets.
-
-        :param parquet: The parquet's name.
-        :type parquet: str
-        :raises RuntimeError: if the parquet is not defined in the data
-            specification file or it does not have the path specified.
-        :returns: Path.
-        :rtype: str
-        """
-
-        try:
-            return self._data_spec[parquet]["path"]
-        except KeyError as err:
-            raise RuntimeError(
-                f"The parquet {parquet} is not defined in the specification "
-                f"file {self._data_spec_file} or it does not have the path "
-                f"specified.\n{err}"
-            )
-
-    def _get_list_of_files(self,
-        path,
-        last_modified_begin=None,
-        last_modified_end=None,
-        days=None) -> list:
+    @staticmethod
+    def _get_list_of_files(
+            path,
+            last_modified_begin=None,
+            last_modified_end=None,
+            days=None
+        ) -> list:
         """Get list of interested files stored in S3 compatible storage and
         returns it.
 
@@ -147,8 +110,7 @@ class Data:
                 last_modified_begin=last_modified_begin,
                 last_modified_end=last_modified_end
             )
-            if self._debug:
-                logging.info("\n".join(file_list))
+            logging.debug("\n".join(file_list))
         except NoFilesFound as err:
             logging.error(f"No parquets found.\n{err}")
         except EmptyDataFrame as err:
@@ -156,13 +118,15 @@ class Data:
 
         return file_list
 
-    def _create_dataframe_from_parquet(self,
-        path, partition_filter=None,
-        columns=None,
-        validate_schema=False,
-        last_modified_begin=None,
-        last_modified_end=None,
-        days=None) -> DataFrame:
+    @staticmethod
+    def _create_dataframe_from_parquet(
+            path, partition_filter=None,
+            columns=None,
+            validate_schema=False,
+            last_modified_begin=None,
+            last_modified_end=None,
+            days=None
+        ) -> pd.DataFrame:
         """Read parquet stored in S3 compatible storage and returns Pandas
         Dataframe.
 
@@ -196,7 +160,7 @@ class Data:
         :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
         :rtype: DataFrame
         """
-        df = None
+        df = pd.DataFrame()
         start = time()
         if days:
             last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
@@ -213,138 +177,103 @@ class Data:
                 last_modified_begin=last_modified_begin,
                 last_modified_end=last_modified_end
             )
-            if self._debug:
-                df.info(verbose=True, memory_usage='deep')
-                logging.info(
-                    f"\nCreation of dataframe {path} took: {time() - start}\n"
-                )
+            df.info(verbose=True, memory_usage="deep")
+            logging.debug(
+                f"\nCreation of dataframe {path} took: {time() - start}\n"
+            )
         except NoFilesFound as err:
-            logging.error(f"No parquets found.\n{err}")
+            logging.error(
+                f"No parquets found in specified time period.\n"
+                f"Nr of days: {days}\n"
+                f"last_modified_begin: {last_modified_begin}\n"
+                f"{err}"
+            )
         except EmptyDataFrame as err:
-            logging.error(f"No data.\n{err}")
+            logging.error(
+                f"No data in parquets in specified time period.\n"
+                f"Nr of days: {days}\n"
+                f"last_modified_begin: {last_modified_begin}\n"
+                f"{err}"
+            )
 
-        self._data = df
         return df
 
-    def check_datasets(self, days: int=None):
-        """Read structure from parquet.
+    def read_all_data(self, days: int=None) -> dict:
+        """Read all data necessary for all applications.
 
-        :param days: Number of days back to the past for which the data will be
-            read.
+        :param days: Number of days to filter. If None, all data will be
+            downloaded.
         :type days: int
-        """
-        self._get_list_of_files(path=self._get_path("trending"), days=days)
-        self._get_list_of_files(path=self._get_path("statistics"), days=days)
-
-    def read_stats(self, days: int=None) -> tuple:
-        """Read statistics from parquet.
-
-        It reads from:
-        - Suite Result Analysis (SRA) partition,
-        - NDRPDR trending partition,
-        - MRR trending partition.
-
-        :param days: Number of days back to the past for which the data will be
-            read.
-        :type days: int
-        :returns: tuple of pandas DataFrame-s with data read from specified
-            parquets.
-        :rtype: tuple of pandas DataFrame-s
+        :returns: A dictionary where keys are names of parquets and values are
+            the pandas dataframes with fetched data.
+        :rtype: dict(str: pandas.DataFrame)
         """
 
-        l_stats = lambda part: True if part["stats_type"] == "sra" else False
-        l_mrr = lambda part: True if part["test_type"] == "mrr" else False
-        l_ndrpdr = lambda part: True if part["test_type"] == "ndrpdr" else False
+        lst_trending = list()
+        lst_iterative = list()
+        lst_coverage = list()
 
-        return (
-            self._create_dataframe_from_parquet(
-                path=self._get_path("statistics"),
-                partition_filter=l_stats,
-                columns=self._get_columns("statistics"),
-                days=days
-            ),
-            self._create_dataframe_from_parquet(
-                path=self._get_path("statistics-trending-mrr"),
-                partition_filter=l_mrr,
-                columns=self._get_columns("statistics-trending-mrr"),
-                days=days
-            ),
-            self._create_dataframe_from_parquet(
-                path=self._get_path("statistics-trending-ndrpdr"),
-                partition_filter=l_ndrpdr,
-                columns=self._get_columns("statistics-trending-ndrpdr"),
-                days=days
+        for data_set in self._data_spec:
+            logging.info(
+                f"Reading data for {data_set['data_type']} "
+                f"{data_set['partition_name']} {data_set.get('release', '')}"
+            )
+            partition_filter = lambda part: True \
+                if part[data_set["partition"]] == data_set["partition_name"] \
+                    else False
+            if data_set["data_type"] in ("trending", "statistics"):
+                time_period = days
+            else:
+                time_period = None
+            data = Data._create_dataframe_from_parquet(
+                path=data_set["path"],
+                partition_filter=partition_filter,
+                columns=data_set.get("columns", None),
+                days=time_period
             )
-        )
-
-    def read_trending_mrr(self, days: int=None) -> DataFrame:
-        """Read MRR data partition from parquet.
-
-        :param days: Number of days back to the past for which the data will be
-            read.
-        :type days: int
-        :returns: Pandas DataFrame with read data.
-        :rtype: DataFrame
-        """
 
-        lambda_f = lambda part: True if part["test_type"] == "mrr" else False
+            if data_set["data_type"] == "statistics":
+                self._data["statistics"] = data
+            elif data_set["data_type"] == "trending":
+                lst_trending.append(data)
+            elif data_set["data_type"] == "iterative":
+                data["release"] = data_set["release"]
+                data["release"] = data["release"].astype("category")
+                lst_iterative.append(data)
+            elif data_set["data_type"] == "coverage":
+                data["release"] = data_set["release"]
+                data["release"] = data["release"].astype("category")
+                lst_coverage.append(data)
+            else:
+                raise NotImplementedError(
+                    f"The data type {data_set['data_type']} is not implemented."
+                )
 
-        return self._create_dataframe_from_parquet(
-            path=self._get_path("trending-mrr"),
-            partition_filter=lambda_f,
-            columns=self._get_columns("trending-mrr"),
-            days=days
+        self._data["iterative"] = pd.concat(
+            lst_iterative,
+            ignore_index=True,
+            copy=False
         )
-
-    def read_trending_ndrpdr(self, days: int=None) -> DataFrame:
-        """Read NDRPDR data partition from iterative parquet.
-
-        :param days: Number of days back to the past for which the data will be
-            read.
-        :type days: int
-        :returns: Pandas DataFrame with read data.
-        :rtype: DataFrame
-        """
-
-        lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
-
-        return self._create_dataframe_from_parquet(
-            path=self._get_path("trending-ndrpdr"),
-            partition_filter=lambda_f,
-            columns=self._get_columns("trending-ndrpdr"),
-            days=days
+        self._data["trending"] = pd.concat(
+            lst_trending,
+            ignore_index=True,
+            copy=False
         )
-
-    def read_iterative_mrr(self, release: str) -> DataFrame:
-        """Read MRR data partition from iterative parquet.
-
-        :param release: The CSIT release from which the data will be read.
-        :type release: str
-        :returns: Pandas DataFrame with read data.
-        :rtype: DataFrame
-        """
-
-        lambda_f = lambda part: True if part["test_type"] == "mrr" else False
-
-        return self._create_dataframe_from_parquet(
-            path=self._get_path("iterative-mrr").format(release=release),
-            partition_filter=lambda_f,
-            columns=self._get_columns("iterative-mrr")
+        self._data["coverage"] = pd.concat(
+            lst_coverage,
+            ignore_index=True,
+            copy=False
         )
 
-    def read_iterative_ndrpdr(self, release: str) -> DataFrame:
-        """Read NDRPDR data partition from parquet.
-
-        :param release: The CSIT release from which the data will be read.
-        :type release: str
-        :returns: Pandas DataFrame with read data.
-        :rtype: DataFrame
-        """
+        for key in self._data.keys():
+            logging.info(
+                f"\nData frame {key}:"
+                f"\n{self._data[key].memory_usage(deep=True)}\n"
+            )
+            self._data[key].info(verbose=True, memory_usage="deep")
 
-        lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
+        mem_alloc = \
+            resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000
+        logging.info(f"Memory allocation: {mem_alloc:.0f}MB")
 
-        return self._create_dataframe_from_parquet(
-            path=self._get_path("iterative-ndrpdr").format(release=release),
-            partition_filter=lambda_f,
-            columns=self._get_columns("iterative-ndrpdr")
-        )
+        return self._data