feat(uti): Move directory
[csit.git] / resources / tools / dash / app / pal / data / data.py
diff --git a/resources/tools/dash/app/pal/data/data.py b/resources/tools/dash/app/pal/data/data.py
deleted file mode 100644 (file)
index 77fd113..0000000
+++ /dev/null
@@ -1,351 +0,0 @@
-# Copyright (c) 2022 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Prepare data for Plotly Dash applications.
-"""
-
-import logging
-import awswrangler as wr
-
-from yaml import load, FullLoader, YAMLError
-from datetime import datetime, timedelta
-from time import time
-from pytz import UTC
-from pandas import DataFrame
-from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
-
-
-class Data:
-    """Gets the data from parquets and stores it for further use by dash
-    applications.
-    """
-
-    def __init__(self, data_spec_file: str, debug: bool=False) -> None:
-        """Initialize the Data object.
-
-        :param data_spec_file: Path to file specifying the data to be read from
-            parquets.
-        :param debug: If True, the debuf information is printed to stdout.
-        :type data_spec_file: str
-        :type debug: bool
-        :raises RuntimeError: if it is not possible to open data_spec_file or it
-            is not a valid yaml file.
-        """
-
-        # Inputs:
-        self._data_spec_file = data_spec_file
-        self._debug = debug
-
-        # Specification of data to be read from parquets:
-        self._data_spec = None
-
-        # Data frame to keep the data:
-        self._data = None
-
-        # Read from files:
-        try:
-            with open(self._data_spec_file, "r") as file_read:
-                self._data_spec = load(file_read, Loader=FullLoader)
-        except IOError as err:
-            raise RuntimeError(
-                f"Not possible to open the file {self._data_spec_file,}\n{err}"
-            )
-        except YAMLError as err:
-            raise RuntimeError(
-                f"An error occurred while parsing the specification file "
-                f"{self._data_spec_file,}\n"
-                f"{err}"
-            )
-
-    @property
-    def data(self):
-        return self._data
-
-    def _get_columns(self, parquet: str) -> list:
-        """Get the list of columns from the data specification file to be read
-        from parquets.
-
-        :param parquet: The parquet's name.
-        :type parquet: str
-        :raises RuntimeError: if the parquet is not defined in the data
-            specification file or it does not have any columns specified.
-        :returns: List of columns.
-        :rtype: list
-        """
-
-        try:
-            return self._data_spec[parquet]["columns"]
-        except KeyError as err:
-            raise RuntimeError(
-                f"The parquet {parquet} is not defined in the specification "
-                f"file {self._data_spec_file} or it does not have any columns "
-                f"specified.\n{err}"
-            )
-
-    def _get_path(self, parquet: str) -> str:
-        """Get the path from the data specification file to be read from
-        parquets.
-
-        :param parquet: The parquet's name.
-        :type parquet: str
-        :raises RuntimeError: if the parquet is not defined in the data
-            specification file or it does not have the path specified.
-        :returns: Path.
-        :rtype: str
-        """
-
-        try:
-            return self._data_spec[parquet]["path"]
-        except KeyError as err:
-            raise RuntimeError(
-                f"The parquet {parquet} is not defined in the specification "
-                f"file {self._data_spec_file} or it does not have the path "
-                f"specified.\n{err}"
-            )
-
-    def _get_list_of_files(self,
-        path,
-        last_modified_begin=None,
-        last_modified_end=None,
-        days=None) -> list:
-        """Get list of interested files stored in S3 compatible storage and
-        returns it.
-
-        :param path: S3 prefix (accepts Unix shell-style wildcards)
-            (e.g. s3://bucket/prefix) or list of S3 objects paths
-            (e.g. [s3://bucket/key0, s3://bucket/key1]).
-        :param last_modified_begin: Filter the s3 files by the Last modified
-            date of the object. The filter is applied only after list all s3
-            files.
-        :param last_modified_end: Filter the s3 files by the Last modified date
-            of the object. The filter is applied only after list all s3 files.
-        :param days: Number of days to filter.
-        :type path: Union[str, List[str]]
-        :type last_modified_begin: datetime, optional
-        :type last_modified_end: datetime, optional
-        :type days: integer, optional
-        :returns: List of file names.
-        :rtype: List
-        """
-        if days:
-            last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
-        try:
-            file_list = wr.s3.list_objects(
-                path=path,
-                suffix="parquet",
-                last_modified_begin=last_modified_begin,
-                last_modified_end=last_modified_end
-            )
-            if self._debug:
-                logging.info("\n".join(file_list))
-        except NoFilesFound as err:
-            logging.error(f"No parquets found.\n{err}")
-        except EmptyDataFrame as err:
-            logging.error(f"No data.\n{err}")
-
-        return file_list
-
-    def _create_dataframe_from_parquet(self,
-        path, partition_filter=None,
-        columns=None,
-        validate_schema=False,
-        last_modified_begin=None,
-        last_modified_end=None,
-        days=None) -> DataFrame:
-        """Read parquet stored in S3 compatible storage and returns Pandas
-        Dataframe.
-
-        :param path: S3 prefix (accepts Unix shell-style wildcards)
-            (e.g. s3://bucket/prefix) or list of S3 objects paths
-            (e.g. [s3://bucket/key0, s3://bucket/key1]).
-        :param partition_filter: Callback Function filters to apply on PARTITION
-            columns (PUSH-DOWN filter). This function MUST receive a single
-            argument (Dict[str, str]) where keys are partitions names and values
-            are partitions values. Partitions values will be always strings
-            extracted from S3. This function MUST return a bool, True to read
-            the partition or False to ignore it. Ignored if dataset=False.
-        :param columns: Names of columns to read from the file(s).
-        :param validate_schema: Check that individual file schemas are all the
-            same / compatible. Schemas within a folder prefix should all be the
-            same. Disable if you have schemas that are different and want to
-            disable this check.
-        :param last_modified_begin: Filter the s3 files by the Last modified
-            date of the object. The filter is applied only after list all s3
-            files.
-        :param last_modified_end: Filter the s3 files by the Last modified date
-            of the object. The filter is applied only after list all s3 files.
-        :param days: Number of days to filter.
-        :type path: Union[str, List[str]]
-        :type partition_filter: Callable[[Dict[str, str]], bool], optional
-        :type columns: List[str], optional
-        :type validate_schema: bool, optional
-        :type last_modified_begin: datetime, optional
-        :type last_modified_end: datetime, optional
-        :type days: integer, optional
-        :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
-        :rtype: DataFrame
-        """
-        df = None
-        start = time()
-        if days:
-            last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
-        try:
-            df = wr.s3.read_parquet(
-                path=path,
-                path_suffix="parquet",
-                ignore_empty=True,
-                validate_schema=validate_schema,
-                use_threads=True,
-                dataset=True,
-                columns=columns,
-                partition_filter=partition_filter,
-                last_modified_begin=last_modified_begin,
-                last_modified_end=last_modified_end
-            )
-            if self._debug:
-                df.info(verbose=True, memory_usage='deep')
-                logging.info(
-                    u"\n"
-                    f"Creation of dataframe {path} took: {time() - start}"
-                    u"\n"
-                )
-        except NoFilesFound as err:
-            logging.error(f"No parquets found.\n{err}")
-        except EmptyDataFrame as err:
-            logging.error(f"No data.\n{err}")
-
-        self._data = df
-        return df
-
-    def check_datasets(self, days: int=None):
-        """Read structure from parquet.
-
-        :param days: Number of days back to the past for which the data will be
-            read.
-        :type days: int
-        """
-        self._get_list_of_files(path=self._get_path("trending"), days=days)
-        self._get_list_of_files(path=self._get_path("statistics"), days=days)
-
-    def read_stats(self, days: int=None) -> tuple:
-        """Read statistics from parquet.
-
-        It reads from:
-        - Suite Result Analysis (SRA) partition,
-        - NDRPDR trending partition,
-        - MRR trending partition.
-
-        :param days: Number of days back to the past for which the data will be
-            read.
-        :type days: int
-        :returns: tuple of pandas DataFrame-s with data read from specified
-            parquets.
-        :rtype: tuple of pandas DataFrame-s
-        """
-
-        l_stats = lambda part: True if part["stats_type"] == "sra" else False
-        l_mrr = lambda part: True if part["test_type"] == "mrr" else False
-        l_ndrpdr = lambda part: True if part["test_type"] == "ndrpdr" else False
-
-        return (
-            self._create_dataframe_from_parquet(
-                path=self._get_path("statistics"),
-                partition_filter=l_stats,
-                columns=self._get_columns("statistics"),
-                days=days
-            ),
-            self._create_dataframe_from_parquet(
-                path=self._get_path("statistics-trending-mrr"),
-                partition_filter=l_mrr,
-                columns=self._get_columns("statistics-trending-mrr"),
-                days=days
-            ),
-            self._create_dataframe_from_parquet(
-                path=self._get_path("statistics-trending-ndrpdr"),
-                partition_filter=l_ndrpdr,
-                columns=self._get_columns("statistics-trending-ndrpdr"),
-                days=days
-            )
-        )
-
-    def read_trending_mrr(self, days: int=None) -> DataFrame:
-        """Read MRR data partition from parquet.
-
-        :param days: Number of days back to the past for which the data will be
-            read.
-        :type days: int
-        :returns: Pandas DataFrame with read data.
-        :rtype: DataFrame
-        """
-
-        lambda_f = lambda part: True if part["test_type"] == "mrr" else False
-
-        return self._create_dataframe_from_parquet(
-            path=self._get_path("trending-mrr"),
-            partition_filter=lambda_f,
-            columns=self._get_columns("trending-mrr"),
-            days=days
-        )
-
-    def read_trending_ndrpdr(self, days: int=None) -> DataFrame:
-        """Read NDRPDR data partition from iterative parquet.
-
-        :param days: Number of days back to the past for which the data will be
-            read.
-        :type days: int
-        :returns: Pandas DataFrame with read data.
-        :rtype: DataFrame
-        """
-
-        lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
-
-        return self._create_dataframe_from_parquet(
-            path=self._get_path("trending-ndrpdr"),
-            partition_filter=lambda_f,
-            columns=self._get_columns("trending-ndrpdr"),
-            days=days
-        )
-
-    def read_iterative_mrr(self, release: str) -> DataFrame:
-        """Read MRR data partition from iterative parquet.
-
-        :param release: The CSIT release from which the data will be read.
-        :type release: str
-        :returns: Pandas DataFrame with read data.
-        :rtype: DataFrame
-        """
-
-        lambda_f = lambda part: True if part["test_type"] == "mrr" else False
-
-        return self._create_dataframe_from_parquet(
-            path=self._get_path("iterative-mrr").format(release=release),
-            partition_filter=lambda_f,
-            columns=self._get_columns("iterative-mrr")
-        )
-
-    def read_iterative_ndrpdr(self, release: str) -> DataFrame:
-        """Read NDRPDR data partition from parquet.
-
-        :param release: The CSIT release from which the data will be read.
-        :type release: str
-        :returns: Pandas DataFrame with read data.
-        :rtype: DataFrame
-        """
-
-        lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
-
-        return self._create_dataframe_from_parquet(
-            path=self._get_path("iterative-ndrpdr").format(release=release),
-            partition_filter=lambda_f,
-            columns=self._get_columns("iterative-ndrpdr")
-        )