# See the License for the specific language governing permissions and
# limitations under the License.
-"""Prepare data for Plotly Dash."""
+"""Prepare data for Plotly Dash applications.
+"""
import logging
-from time import time
-
import awswrangler as wr
from yaml import load, FullLoader, YAMLError
+from datetime import datetime, timedelta
+from time import time
+from pytz import UTC
+from pandas import DataFrame
from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
class Data:
- """
+ """Gets the data from parquets and stores it for further use by dash
+ applications.
"""
- def __init__(self, data_spec_file, debug=False):
- """
+ def __init__(self, data_spec_file: str, debug: bool=False) -> None:
+ """Initialize the Data object.
+
+ :param data_spec_file: Path to file specifying the data to be read from
+ parquets.
+ :param debug: If True, the debuf information is printed to stdout.
+ :type data_spec_file: str
+ :type debug: bool
+ :raises RuntimeError: if it is not possible to open data_spec_file or it
+ is not a valid yaml file.
"""
# Inputs:
def data(self):
return self._data
- def _get_columns(self, parquet):
+ def _get_columns(self, parquet: str) -> list:
+ """Get the list of columns from the data specification file to be read
+ from parquets.
+
+ :param parquet: The parquet's name.
+ :type parquet: str
+ :raises RuntimeError: if the parquet is not defined in the data
+ specification file or it does not have any columns specified.
+ :returns: List of columns.
+ :rtype: list
+ """
+
try:
return self._data_spec[parquet]["columns"]
except KeyError as err:
f"specified.\n{err}"
)
- def _get_path(self, parquet):
+ def _get_path(self, parquet: str) -> str:
+ """Get the path from the data specification file to be read from
+ parquets.
+
+ :param parquet: The parquet's name.
+ :type parquet: str
+ :raises RuntimeError: if the parquet is not defined in the data
+ specification file or it does not have the path specified.
+ :returns: Path.
+ :rtype: str
+ """
+
try:
return self._data_spec[parquet]["path"]
except KeyError as err:
)
def _create_dataframe_from_parquet(self,
- path, partition_filter=None, columns=None,
- validate_schema=False, last_modified_begin=None,
- last_modified_end=None):
+ path, partition_filter=None,
+ columns=None,
+ validate_schema=False,
+ last_modified_begin=None,
+ last_modified_end=None,
+ days=None) -> DataFrame:
"""Read parquet stored in S3 compatible storage and returns Pandas
Dataframe.
"""
df = None
start = time()
+ if days:
+ last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
try:
df = wr.s3.read_parquet(
path=path,
self._data = df
return df
- def read_stats(self):
- """Read Suite Result Analysis data partition from parquet.
+ def read_stats(self, days: int=None) -> tuple:
+ """Read statistics from parquet.
+
+ It reads from:
+ - Suite Result Analysis (SRA) partition,
+ - NDRPDR trending partition,
+ - MRR trending partition.
+
+ :param days: Number of days back to the past for which the data will be
+ read.
+ :type days: int
+ :returns: tuple of pandas DataFrame-s with data read from specified
+ parquets.
+ :rtype: tuple of pandas DataFrame-s
"""
- lambda_f = lambda part: True if part["stats_type"] == "sra" else False
- return self._create_dataframe_from_parquet(
- path=self._get_path("statistics"),
- partition_filter=lambda_f,
- columns=None # Get all columns.
+ l_stats = lambda part: True if part["stats_type"] == "sra" else False
+ l_mrr = lambda part: True if part["test_type"] == "mrr" else False
+ l_ndrpdr = lambda part: True if part["test_type"] == "ndrpdr" else False
+
+ return (
+ self._create_dataframe_from_parquet(
+ path=self._get_path("statistics"),
+ partition_filter=l_stats,
+ columns=self._get_columns("statistics"),
+ days=days
+ ),
+ self._create_dataframe_from_parquet(
+ path=self._get_path("statistics-trending-mrr"),
+ partition_filter=l_mrr,
+ columns=self._get_columns("statistics-trending-mrr"),
+ days=days
+ ),
+ self._create_dataframe_from_parquet(
+ path=self._get_path("statistics-trending-ndrpdr"),
+ partition_filter=l_ndrpdr,
+ columns=self._get_columns("statistics-trending-ndrpdr"),
+ days=days
+ )
)
- def read_trending_mrr(self):
+ def read_trending_mrr(self, days: int=None) -> DataFrame:
"""Read MRR data partition from parquet.
+
+ :param days: Number of days back to the past for which the data will be
+ read.
+ :type days: int
+ :returns: Pandas DataFrame with read data.
+ :rtype: DataFrame
"""
+
lambda_f = lambda part: True if part["test_type"] == "mrr" else False
return self._create_dataframe_from_parquet(
path=self._get_path("trending-mrr"),
partition_filter=lambda_f,
- columns=self._get_columns("trending-mrr")
+ columns=self._get_columns("trending-mrr"),
+ days=days
)
- def read_trending_ndrpdr(self):
+ def read_trending_ndrpdr(self, days: int=None) -> DataFrame:
"""Read NDRPDR data partition from iterative parquet.
+
+ :param days: Number of days back to the past for which the data will be
+ read.
+ :type days: int
+ :returns: Pandas DataFrame with read data.
+ :rtype: DataFrame
"""
+
lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
return self._create_dataframe_from_parquet(
path=self._get_path("trending-ndrpdr"),
partition_filter=lambda_f,
- columns=self._get_columns("trending-ndrpdr")
+ columns=self._get_columns("trending-ndrpdr"),
+ days=days
)
- def read_iterative_mrr(self):
+ def read_iterative_mrr(self, release: str) -> DataFrame:
"""Read MRR data partition from iterative parquet.
+
+ :param release: The CSIT release from which the data will be read.
+ :type release: str
+ :returns: Pandas DataFrame with read data.
+ :rtype: DataFrame
"""
+
lambda_f = lambda part: True if part["test_type"] == "mrr" else False
return self._create_dataframe_from_parquet(
- path=self._get_path("iterative-mrr"),
+ path=self._get_path("iterative-mrr").format(release=release),
partition_filter=lambda_f,
columns=self._get_columns("iterative-mrr")
)
- def read_iterative_ndrpdr(self):
+ def read_iterative_ndrpdr(self, release: str) -> DataFrame:
"""Read NDRPDR data partition from parquet.
+
+ :param release: The CSIT release from which the data will be read.
+ :type release: str
+ :returns: Pandas DataFrame with read data.
+ :rtype: DataFrame
"""
+
lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
return self._create_dataframe_from_parquet(
- path=self._get_path("iterative-ndrpdr"),
+ path=self._get_path("iterative-ndrpdr").format(release=release),
partition_filter=lambda_f,
columns=self._get_columns("iterative-ndrpdr")
)