"""
import logging
+import resource
import awswrangler as wr
+import pandas as pd
+import pyarrow as pa
from yaml import load, FullLoader, YAMLError
from datetime import datetime, timedelta
from time import time
from pytz import UTC
-from pandas import DataFrame
from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
+from pyarrow.lib import ArrowInvalid, ArrowNotImplementedError
+
+from ..utils.constants import Constants as C
class Data:
applications.
"""
- def __init__(self, data_spec_file: str, debug: bool=False) -> None:
+ def __init__(self, data_spec_file: str) -> None:
"""Initialize the Data object.
:param data_spec_file: Path to file specifying the data to be read from
parquets.
- :param debug: If True, the debuf information is printed to stdout.
:type data_spec_file: str
- :type debug: bool
:raises RuntimeError: if it is not possible to open data_spec_file or it
is not a valid yaml file.
"""
# Inputs:
self._data_spec_file = data_spec_file
- self._debug = debug
# Specification of data to be read from parquets:
- self._data_spec = None
+ self._data_spec = list()
# Data frame to keep the data:
- self._data = None
+ self._data = {
+ "statistics": pd.DataFrame(),
+ "trending": pd.DataFrame(),
+ "iterative": pd.DataFrame(),
+ "coverage": pd.DataFrame()
+ }
# Read from files:
try:
def data(self):
return self._data
- def _get_columns(self, parquet: str) -> list:
- """Get the list of columns from the data specification file to be read
- from parquets.
-
- :param parquet: The parquet's name.
- :type parquet: str
- :raises RuntimeError: if the parquet is not defined in the data
- specification file or it does not have any columns specified.
- :returns: List of columns.
- :rtype: list
- """
-
- try:
- return self._data_spec[parquet]["columns"]
- except KeyError as err:
- raise RuntimeError(
- f"The parquet {parquet} is not defined in the specification "
- f"file {self._data_spec_file} or it does not have any columns "
- f"specified.\n{err}"
- )
-
- def _get_path(self, parquet: str) -> str:
- """Get the path from the data specification file to be read from
- parquets.
-
- :param parquet: The parquet's name.
- :type parquet: str
- :raises RuntimeError: if the parquet is not defined in the data
- specification file or it does not have the path specified.
- :returns: Path.
- :rtype: str
- """
-
- try:
- return self._data_spec[parquet]["path"]
- except KeyError as err:
- raise RuntimeError(
- f"The parquet {parquet} is not defined in the specification "
- f"file {self._data_spec_file} or it does not have the path "
- f"specified.\n{err}"
- )
-
- def _get_list_of_files(self,
- path,
- last_modified_begin=None,
- last_modified_end=None,
- days=None) -> list:
+ @staticmethod
+ def _get_list_of_files(
+ path,
+ last_modified_begin=None,
+ last_modified_end=None,
+ days=None
+ ) -> list:
"""Get list of interested files stored in S3 compatible storage and
returns it.
last_modified_begin=last_modified_begin,
last_modified_end=last_modified_end
)
- if self._debug:
- logging.info("\n".join(file_list))
+ logging.debug("\n".join(file_list))
except NoFilesFound as err:
logging.error(f"No parquets found.\n{err}")
except EmptyDataFrame as err:
return file_list
- def _create_dataframe_from_parquet(self,
- path, partition_filter=None,
- columns=None,
- validate_schema=False,
- last_modified_begin=None,
- last_modified_end=None,
- days=None) -> DataFrame:
+ def _validate_columns(self, data_type: str) -> str:
+ """Check if all columns are present in the dataframe.
+
+ :param data_type: The data type defined in data.yaml
+ :type data_type: str
+ :returns: Error message if validation fails, otherwise empty string.
+ :rtype: str
+ """
+ defined_columns = set()
+ for data_set in self._data_spec:
+ if data_set.get("data_type", str()) == data_type:
+ defined_columns.update(data_set.get("columns", set()))
+
+ if not defined_columns:
+ return "No columns defined in the data set(s)."
+
+ if self.data[data_type].empty:
+ return "No data."
+
+ ret_msg = str()
+ for col in defined_columns:
+ if col not in self.data[data_type].columns:
+ if not ret_msg:
+ ret_msg = "Missing columns: "
+ else:
+ ret_msg += ", "
+ ret_msg += f"{col}"
+ return ret_msg
+
+ @staticmethod
+ def _write_parquet_schema(
+ path,
+ partition_filter=None,
+ columns=None,
+ validate_schema=False,
+ last_modified_begin=None,
+ last_modified_end=None,
+ days=None
+ ) -> None:
+ """Auxiliary function to write parquet schemas. Use it instead of
+ "_create_dataframe_from_parquet" in "read_all_data".
+
+ :param path: S3 prefix (accepts Unix shell-style wildcards)
+ (e.g. s3://bucket/prefix) or list of S3 objects paths
+ (e.g. [s3://bucket/key0, s3://bucket/key1]).
+ :param partition_filter: Callback Function filters to apply on PARTITION
+ columns (PUSH-DOWN filter). This function MUST receive a single
+ argument (Dict[str, str]) where keys are partitions names and values
+ are partitions values. Partitions values will be always strings
+ extracted from S3. This function MUST return a bool, True to read
+ the partition or False to ignore it. Ignored if dataset=False.
+ :param columns: Names of columns to read from the file(s).
+ :param validate_schema: Check that individual file schemas are all the
+ same / compatible. Schemas within a folder prefix should all be the
+ same. Disable if you have schemas that are different and want to
+ disable this check.
+ :param last_modified_begin: Filter the s3 files by the Last modified
+ date of the object. The filter is applied only after list all s3
+ files.
+ :param last_modified_end: Filter the s3 files by the Last modified date
+ of the object. The filter is applied only after list all s3 files.
+ :param days: Number of days to filter.
+ :type path: Union[str, List[str]]
+ :type partition_filter: Callable[[Dict[str, str]], bool], optional
+ :type columns: List[str], optional
+ :type validate_schema: bool, optional
+ :type last_modified_begin: datetime, optional
+ :type last_modified_end: datetime, optional
+ :type days: integer, optional
+ """
+ if days:
+ last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
+
+ df = wr.s3.read_parquet(
+ path=path,
+ path_suffix="parquet",
+ ignore_empty=True,
+ validate_schema=validate_schema,
+ use_threads=True,
+ dataset=True,
+ columns=columns,
+ partition_filter=partition_filter,
+ last_modified_begin=last_modified_begin,
+ last_modified_end=last_modified_end,
+ chunked=1
+ )
+
+ for itm in df:
+ try:
+ # Specify the condition or remove it:
+ if pd.api.types.is_string_dtype(itm["result_rate_unit"]):
+ print(pa.Schema.from_pandas(itm))
+ pa.parquet.write_metadata(
+ pa.Schema.from_pandas(itm),
+ f"{C.PATH_TO_SCHEMAS}_tmp_schema"
+ )
+ print(itm)
+ break
+ except KeyError:
+ pass
+
+ @staticmethod
+ def _create_dataframe_from_parquet(
+ path,
+ partition_filter=None,
+ columns=None,
+ validate_schema=False,
+ last_modified_begin=None,
+ last_modified_end=None,
+ days=None,
+ schema=None
+ ) -> pd.DataFrame:
"""Read parquet stored in S3 compatible storage and returns Pandas
Dataframe.
:param last_modified_end: Filter the s3 files by the Last modified date
of the object. The filter is applied only after list all s3 files.
:param days: Number of days to filter.
+ :param schema: Path to schema to use when reading data from the parquet.
:type path: Union[str, List[str]]
:type partition_filter: Callable[[Dict[str, str]], bool], optional
:type columns: List[str], optional
:type last_modified_begin: datetime, optional
:type last_modified_end: datetime, optional
:type days: integer, optional
+ :type schema: string
:returns: Pandas DataFrame or None if DataFrame cannot be fetched.
:rtype: DataFrame
"""
- df = None
+ df = pd.DataFrame()
start = time()
if days:
last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
path=path,
path_suffix="parquet",
ignore_empty=True,
+ schema=schema,
validate_schema=validate_schema,
use_threads=True,
dataset=True,
columns=columns,
partition_filter=partition_filter,
last_modified_begin=last_modified_begin,
- last_modified_end=last_modified_end
+ last_modified_end=last_modified_end,
+ dtype_backend="pyarrow"
)
- if self._debug:
- df.info(verbose=True, memory_usage='deep')
- logging.info(
- f"\nCreation of dataframe {path} took: {time() - start}\n"
- )
+
+ df.info(verbose=True, memory_usage="deep")
+ logging.debug(
+ f"\nCreation of dataframe {path} took: {time() - start}\n"
+ )
+ except (ArrowInvalid, ArrowNotImplementedError) as err:
+ logging.error(f"Reading of data from parquets FAILED.\n{repr(err)}")
except NoFilesFound as err:
- logging.error(f"No parquets found.\n{err}")
+ logging.error(
+ f"Reading of data from parquets FAILED.\n"
+ f"No parquets found in specified time period.\n"
+ f"Nr of days: {days}\n"
+ f"last_modified_begin: {last_modified_begin}\n"
+ f"{repr(err)}"
+ )
except EmptyDataFrame as err:
- logging.error(f"No data.\n{err}")
+ logging.error(
+ f"Reading of data from parquets FAILED.\n"
+ f"No data in parquets in specified time period.\n"
+ f"Nr of days: {days}\n"
+ f"last_modified_begin: {last_modified_begin}\n"
+ f"{repr(err)}"
+ )
- self._data = df
return df
- def check_datasets(self, days: int=None):
- """Read structure from parquet.
+ def read_all_data(self, days: int=None) -> dict:
+ """Read all data necessary for all applications.
- :param days: Number of days back to the past for which the data will be
- read.
+ :param days: Number of days to filter. If None, all data will be
+ downloaded.
:type days: int
+ :returns: A dictionary where keys are names of parquets and values are
+ the pandas dataframes with fetched data.
+ :rtype: dict(str: pandas.DataFrame)
"""
- self._get_list_of_files(path=self._get_path("trending"), days=days)
- self._get_list_of_files(path=self._get_path("statistics"), days=days)
- def read_stats(self, days: int=None) -> tuple:
- """Read statistics from parquet.
-
- It reads from:
- - Suite Result Analysis (SRA) partition,
- - NDRPDR trending partition,
- - MRR trending partition.
-
- :param days: Number of days back to the past for which the data will be
- read.
- :type days: int
- :returns: tuple of pandas DataFrame-s with data read from specified
- parquets.
- :rtype: tuple of pandas DataFrame-s
- """
-
- l_stats = lambda part: True if part["stats_type"] == "sra" else False
- l_mrr = lambda part: True if part["test_type"] == "mrr" else False
- l_ndrpdr = lambda part: True if part["test_type"] == "ndrpdr" else False
-
- return (
- self._create_dataframe_from_parquet(
- path=self._get_path("statistics"),
- partition_filter=l_stats,
- columns=self._get_columns("statistics"),
- days=days
- ),
- self._create_dataframe_from_parquet(
- path=self._get_path("statistics-trending-mrr"),
- partition_filter=l_mrr,
- columns=self._get_columns("statistics-trending-mrr"),
- days=days
- ),
- self._create_dataframe_from_parquet(
- path=self._get_path("statistics-trending-ndrpdr"),
- partition_filter=l_ndrpdr,
- columns=self._get_columns("statistics-trending-ndrpdr"),
- days=days
+ data_lists = {
+ "statistics": list(),
+ "trending": list(),
+ "iterative": list(),
+ "coverage": list()
+ }
+
+ logging.info("\n\nReading data:\n" + "-" * 13 + "\n")
+ for data_set in self._data_spec:
+ logging.info(
+ f"\n\nReading data for {data_set['data_type']} "
+ f"{data_set['partition_name']} {data_set.get('release', '')}\n"
)
- )
-
- def read_trending_mrr(self, days: int=None) -> DataFrame:
- """Read MRR data partition from parquet.
-
- :param days: Number of days back to the past for which the data will be
- read.
- :type days: int
- :returns: Pandas DataFrame with read data.
- :rtype: DataFrame
- """
-
- lambda_f = lambda part: True if part["test_type"] == "mrr" else False
-
- return self._create_dataframe_from_parquet(
- path=self._get_path("trending-mrr"),
- partition_filter=lambda_f,
- columns=self._get_columns("trending-mrr"),
- days=days
- )
-
- def read_trending_ndrpdr(self, days: int=None) -> DataFrame:
- """Read NDRPDR data partition from iterative parquet.
-
- :param days: Number of days back to the past for which the data will be
- read.
- :type days: int
- :returns: Pandas DataFrame with read data.
- :rtype: DataFrame
- """
-
- lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
-
- return self._create_dataframe_from_parquet(
- path=self._get_path("trending-ndrpdr"),
- partition_filter=lambda_f,
- columns=self._get_columns("trending-ndrpdr"),
- days=days
- )
-
- def read_iterative_mrr(self, release: str) -> DataFrame:
- """Read MRR data partition from iterative parquet.
-
- :param release: The CSIT release from which the data will be read.
- :type release: str
- :returns: Pandas DataFrame with read data.
- :rtype: DataFrame
- """
+ schema_file = data_set.get("schema", None)
+ if schema_file:
+ try:
+ schema = pa.parquet.read_schema(
+ f"{C.PATH_TO_SCHEMAS}{schema_file}"
+ )
+ except FileNotFoundError as err:
+ logging.error(repr(err))
+ logging.error("Proceeding without schema.")
+ schema = None
+ else:
+ schema = None
+ partition_filter = lambda part: True \
+ if part[data_set["partition"]] == data_set["partition_name"] \
+ else False
+ if data_set["data_type"] in ("trending", "statistics"):
+ time_period = days
+ else:
+ time_period = None
+ data = Data._create_dataframe_from_parquet(
+ path=data_set["path"],
+ partition_filter=partition_filter,
+ columns=data_set.get("columns", None),
+ days=time_period,
+ schema=schema
+ )
+ if data_set["data_type"] in ("iterative", "coverage"):
+ data["release"] = data_set["release"]
+ data["release"] = data["release"].astype("category")
- lambda_f = lambda part: True if part["test_type"] == "mrr" else False
+ data_lists[data_set["data_type"]].append(data)
- return self._create_dataframe_from_parquet(
- path=self._get_path("iterative-mrr").format(release=release),
- partition_filter=lambda_f,
- columns=self._get_columns("iterative-mrr")
+ logging.info(
+ "\n\nData post-processing, validation and summary:\n" +
+ "-" * 45 + "\n"
)
+ for key in self._data.keys():
+ logging.info(f"\n\nDataframe {key}:\n")
+ self._data[key] = pd.concat(
+ data_lists[key],
+ ignore_index=True,
+ copy=False
+ )
+ self._data[key].info(verbose=True, memory_usage="deep")
+ err_msg = self._validate_columns(key)
+ if err_msg:
+ self._data[key] = pd.DataFrame()
+ logging.error(
+ f"Data validation FAILED.\n"
+ f"{err_msg}\n"
+ "Generated dataframe replaced by an empty dataframe."
+ )
- def read_iterative_ndrpdr(self, release: str) -> DataFrame:
- """Read NDRPDR data partition from parquet.
-
- :param release: The CSIT release from which the data will be read.
- :type release: str
- :returns: Pandas DataFrame with read data.
- :rtype: DataFrame
- """
-
- lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
+ mem_alloc = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000
+ logging.info(f"\n\nMemory allocation: {mem_alloc:.0f}MB\n")
- return self._create_dataframe_from_parquet(
- path=self._get_path("iterative-ndrpdr").format(release=release),
- partition_filter=lambda_f,
- columns=self._get_columns("iterative-ndrpdr")
- )
+ return self._data