C-Dash: Update requirements file
[csit.git] / csit.infra.dash / app / cdash / data / data.py
index a0d698e..2bf3649 100644 (file)
@@ -18,12 +18,16 @@ import logging
 import resource
 import awswrangler as wr
 import pandas as pd
+import pyarrow as pa
 
 from yaml import load, FullLoader, YAMLError
 from datetime import datetime, timedelta
 from time import time
 from pytz import UTC
 from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
+from pyarrow.lib import ArrowInvalid, ArrowNotImplementedError
+
+from ..utils.constants import Constants as C
 
 
 class Data:
@@ -118,14 +122,117 @@ class Data:
 
         return file_list
 
+    def _validate_columns(self, data_type: str) -> str:
+        """Check if all columns are present in the dataframe.
+
+        :param data_type: The data type defined in data.yaml
+        :type data_type: str
+        :returns: Error message if validation fails, otherwise empty string.
+        :rtype: str
+        """
+        defined_columns = set()
+        for data_set in self._data_spec:
+            if data_set.get("data_type", str()) == data_type:
+                defined_columns.update(data_set.get("columns", set()))
+
+        if not defined_columns:
+            return "No columns defined in the data set(s)."
+
+        if self.data[data_type].empty:
+            return "No data."
+
+        ret_msg = str()
+        for col in defined_columns:
+            if col not in self.data[data_type].columns:
+                if not ret_msg:
+                    ret_msg = "Missing columns: "
+                else:
+                    ret_msg += ", "
+                ret_msg += f"{col}"
+        return ret_msg
+
     @staticmethod
-    def _create_dataframe_from_parquet(
-            path, partition_filter=None,
+    def _write_parquet_schema(
+            path,
+            partition_filter=None,
             columns=None,
             validate_schema=False,
             last_modified_begin=None,
             last_modified_end=None,
             days=None
+        ) -> None:
+        """Auxiliary function to write parquet schemas. Use it instead of
+        "_create_dataframe_from_parquet" in "read_all_data".
+
+        :param path: S3 prefix (accepts Unix shell-style wildcards)
+            (e.g. s3://bucket/prefix) or list of S3 objects paths
+            (e.g. [s3://bucket/key0, s3://bucket/key1]).
+        :param partition_filter: Callback Function filters to apply on PARTITION
+            columns (PUSH-DOWN filter). This function MUST receive a single
+            argument (Dict[str, str]) where keys are partitions names and values
+            are partitions values. Partitions values will be always strings
+            extracted from S3. This function MUST return a bool, True to read
+            the partition or False to ignore it. Ignored if dataset=False.
+        :param columns: Names of columns to read from the file(s).
+        :param validate_schema: Check that individual file schemas are all the
+            same / compatible. Schemas within a folder prefix should all be the
+            same. Disable if you have schemas that are different and want to
+            disable this check.
+        :param last_modified_begin: Filter the s3 files by the Last modified
+            date of the object. The filter is applied only after list all s3
+            files.
+        :param last_modified_end: Filter the s3 files by the Last modified date
+            of the object. The filter is applied only after list all s3 files.
+        :param days: Number of days to filter.
+        :type path: Union[str, List[str]]
+        :type partition_filter: Callable[[Dict[str, str]], bool], optional
+        :type columns: List[str], optional
+        :type validate_schema: bool, optional
+        :type last_modified_begin: datetime, optional
+        :type last_modified_end: datetime, optional
+        :type days: integer, optional
+        """
+        if days:
+            last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
+
+        df = wr.s3.read_parquet(
+            path=path,
+            path_suffix="parquet",
+            ignore_empty=True,
+            validate_schema=validate_schema,
+            use_threads=True,
+            dataset=True,
+            columns=columns,
+            partition_filter=partition_filter,
+            last_modified_begin=last_modified_begin,
+            last_modified_end=last_modified_end,
+            chunked=1
+        )
+
+        for itm in df:
+            try:
+                # Specify the condition or remove it:
+                if pd.api.types.is_string_dtype(itm["result_rate_unit"]):
+                    print(pa.Schema.from_pandas(itm))
+                    pa.parquet.write_metadata(
+                        pa.Schema.from_pandas(itm),
+                        f"{C.PATH_TO_SCHEMAS}_tmp_schema"
+                    )
+                    print(itm)
+                    break
+            except KeyError:
+                pass
+
+    @staticmethod
+    def _create_dataframe_from_parquet(
+            path,
+            partition_filter=None,
+            columns=None,
+            validate_schema=False,
+            last_modified_begin=None,
+            last_modified_end=None,
+            days=None,
+            schema=None
         ) -> pd.DataFrame:
         """Read parquet stored in S3 compatible storage and returns Pandas
         Dataframe.
@@ -150,6 +257,7 @@ class Data:
         :param last_modified_end: Filter the s3 files by the Last modified date
             of the object. The filter is applied only after list all s3 files.
         :param days: Number of days to filter.
+        :param schema: Path to schema to use when reading data from the parquet.
         :type path: Union[str, List[str]]
         :type partition_filter: Callable[[Dict[str, str]], bool], optional
         :type columns: List[str], optional
@@ -157,6 +265,7 @@ class Data:
         :type last_modified_begin: datetime, optional
         :type last_modified_end: datetime, optional
         :type days: integer, optional
+        :type schema: string
         :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
         :rtype: DataFrame
         """
@@ -169,31 +278,38 @@ class Data:
                 path=path,
                 path_suffix="parquet",
                 ignore_empty=True,
+                schema=schema,
                 validate_schema=validate_schema,
                 use_threads=True,
                 dataset=True,
                 columns=columns,
                 partition_filter=partition_filter,
                 last_modified_begin=last_modified_begin,
-                last_modified_end=last_modified_end
+                last_modified_end=last_modified_end,
+                dtype_backend="pyarrow"
             )
+
             df.info(verbose=True, memory_usage="deep")
             logging.debug(
                 f"\nCreation of dataframe {path} took: {time() - start}\n"
             )
+        except (ArrowInvalid, ArrowNotImplementedError) as err:
+            logging.error(f"Reading of data from parquets FAILED.\n{repr(err)}")
         except NoFilesFound as err:
             logging.error(
+                f"Reading of data from parquets FAILED.\n"
                 f"No parquets found in specified time period.\n"
                 f"Nr of days: {days}\n"
                 f"last_modified_begin: {last_modified_begin}\n"
-                f"{err}"
+                f"{repr(err)}"
             )
         except EmptyDataFrame as err:
             logging.error(
+                f"Reading of data from parquets FAILED.\n"
                 f"No data in parquets in specified time period.\n"
                 f"Nr of days: {days}\n"
                 f"last_modified_begin: {last_modified_begin}\n"
-                f"{err}"
+                f"{repr(err)}"
             )
 
         return df
@@ -209,15 +325,31 @@ class Data:
         :rtype: dict(str: pandas.DataFrame)
         """
 
-        lst_trending = list()
-        lst_iterative = list()
-        lst_coverage = list()
+        data_lists = {
+            "statistics": list(),
+            "trending": list(),
+            "iterative": list(),
+            "coverage": list()
+        }
 
+        logging.info("\n\nReading data:\n" + "-" * 13 + "\n")
         for data_set in self._data_spec:
             logging.info(
-                f"Reading data for {data_set['data_type']} "
-                f"{data_set['partition_name']} {data_set.get('release', '')}"
+                f"\n\nReading data for {data_set['data_type']} "
+                f"{data_set['partition_name']} {data_set.get('release', '')}\n"
             )
+            schema_file = data_set.get("schema", None)
+            if schema_file:
+                try:
+                    schema = pa.parquet.read_schema(
+                        f"{C.PATH_TO_SCHEMAS}{schema_file}"
+                    )
+                except FileNotFoundError as err:
+                    logging.error(repr(err))
+                    logging.error("Proceeding without schema.")
+                    schema = None
+            else:
+                schema = None
             partition_filter = lambda part: True \
                 if part[data_set["partition"]] == data_set["partition_name"] \
                     else False
@@ -229,51 +361,37 @@ class Data:
                 path=data_set["path"],
                 partition_filter=partition_filter,
                 columns=data_set.get("columns", None),
-                days=time_period
+                days=time_period,
+                schema=schema
             )
-
-            if data_set["data_type"] == "statistics":
-                self._data["statistics"] = data
-            elif data_set["data_type"] == "trending":
-                lst_trending.append(data)
-            elif data_set["data_type"] == "iterative":
-                data["release"] = data_set["release"]
-                data["release"] = data["release"].astype("category")
-                lst_iterative.append(data)
-            elif data_set["data_type"] == "coverage":
+            if data_set["data_type"] in ("iterative", "coverage"):
                 data["release"] = data_set["release"]
                 data["release"] = data["release"].astype("category")
-                lst_coverage.append(data)
-            else:
-                raise NotImplementedError(
-                    f"The data type {data_set['data_type']} is not implemented."
-                )
 
-        self._data["iterative"] = pd.concat(
-            lst_iterative,
-            ignore_index=True,
-            copy=False
-        )
-        self._data["trending"] = pd.concat(
-            lst_trending,
-            ignore_index=True,
-            copy=False
-        )
-        self._data["coverage"] = pd.concat(
-            lst_coverage,
-            ignore_index=True,
-            copy=False
-        )
+            data_lists[data_set["data_type"]].append(data)
 
+        logging.info(
+            "\n\nData post-processing, validation and summary:\n" +
+            "-" * 45 + "\n"
+        )
         for key in self._data.keys():
-            logging.info(
-                f"\nData frame {key}:"
-                f"\n{self._data[key].memory_usage(deep=True)}\n"
-            )
+            logging.info(f"\n\nDataframe {key}:\n")
+            self._data[key] = pd.concat(
+                data_lists[key],
+                ignore_index=True,
+                copy=False
+            )    
             self._data[key].info(verbose=True, memory_usage="deep")
+            err_msg = self._validate_columns(key)
+            if err_msg:
+                self._data[key] = pd.DataFrame()
+                logging.error(
+                    f"Data validation FAILED.\n"
+                    f"{err_msg}\n"
+                    "Generated dataframe replaced by an empty dataframe."
+                )
 
-        mem_alloc = \
-            resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000
-        logging.info(f"Memory allocation: {mem_alloc:.0f}MB")
+        mem_alloc = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000
+        logging.info(f"\n\nMemory allocation: {mem_alloc:.0f}MB\n")
 
         return self._data