"""Prepare data for Plotly Dash."""
+from datetime import datetime, timedelta
import logging
from time import time
import awswrangler as wr
+from pytz import UTC
from yaml import load, FullLoader, YAMLError
from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
def _create_dataframe_from_parquet(self,
path, partition_filter=None, columns=None,
validate_schema=False, last_modified_begin=None,
- last_modified_end=None):
+ last_modified_end=None, days=None):
"""Read parquet stored in S3 compatible storage and returns Pandas
Dataframe.
"""
df = None
start = time()
+ if days:
+ last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
try:
df = wr.s3.read_parquet(
path=path,
self._data = df
return df
- def read_stats(self):
+ def read_stats(self, days=None):
"""Read Suite Result Analysis data partition from parquet.
"""
- lambda_f = lambda part: True if part["stats_type"] == "sra" else False
-
- return self._create_dataframe_from_parquet(
- path=self._get_path("statistics"),
- partition_filter=lambda_f,
- columns=None # Get all columns.
+ l_stats = lambda part: True if part["stats_type"] == "sra" else False
+ l_mrr = lambda part: True if part["test_type"] == "mrr" else False
+ l_ndrpdr = lambda part: True if part["test_type"] == "ndrpdr" else False
+
+ return (
+ self._create_dataframe_from_parquet(
+ path=self._get_path("statistics"),
+ partition_filter=l_stats,
+ columns=self._get_columns("statistics"),
+ days=days
+ ),
+ self._create_dataframe_from_parquet(
+ path=self._get_path("statistics-trending"),
+ partition_filter=l_mrr,
+ columns=self._get_columns("statistics-trending"),
+ days=days
+ ),
+ self._create_dataframe_from_parquet(
+ path=self._get_path("statistics-trending"),
+ partition_filter=l_ndrpdr,
+ columns=self._get_columns("statistics-trending"),
+ days=days
+ )
)
- def read_trending_mrr(self):
+ def read_trending_mrr(self, days=None):
"""Read MRR data partition from parquet.
"""
lambda_f = lambda part: True if part["test_type"] == "mrr" else False
return self._create_dataframe_from_parquet(
path=self._get_path("trending-mrr"),
partition_filter=lambda_f,
- columns=self._get_columns("trending-mrr")
+ columns=self._get_columns("trending-mrr"),
+ days=days
)
- def read_trending_ndrpdr(self):
+ def read_trending_ndrpdr(self, days=None):
"""Read NDRPDR data partition from iterative parquet.
"""
lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
return self._create_dataframe_from_parquet(
path=self._get_path("trending-ndrpdr"),
partition_filter=lambda_f,
- columns=self._get_columns("trending-ndrpdr")
+ columns=self._get_columns("trending-ndrpdr"),
+ days=days
)
- def read_iterative_mrr(self):
+ def read_iterative_mrr(self, days=None):
"""Read MRR data partition from iterative parquet.
"""
lambda_f = lambda part: True if part["test_type"] == "mrr" else False
return self._create_dataframe_from_parquet(
path=self._get_path("iterative-mrr"),
partition_filter=lambda_f,
- columns=self._get_columns("iterative-mrr")
+ columns=self._get_columns("iterative-mrr"),
+ days=days
)
- def read_iterative_ndrpdr(self):
+ def read_iterative_ndrpdr(self, days=None):
"""Read NDRPDR data partition from parquet.
"""
lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
return self._create_dataframe_from_parquet(
path=self._get_path("iterative-ndrpdr"),
partition_filter=lambda_f,
- columns=self._get_columns("iterative-ndrpdr")
+ columns=self._get_columns("iterative-ndrpdr"),
+ days=days
)