C-Dash: Add bandwidth to MRR iterative
[csit.git] / csit.infra.dash / app / cdash / data / data.py
index 2bf3649..ce98476 100644 (file)
@@ -30,6 +30,12 @@ from pyarrow.lib import ArrowInvalid, ArrowNotImplementedError
 from ..utils.constants import Constants as C
 
 
+# If True, pyarrow.Schema is generated. See also condition in the method
+# _write_parquet_schema.
+# To generate schema, select only one data set in data.yaml file.
+GENERATE_SCHEMA = False
+
+
 class Data:
     """Gets the data from parquets and stores it for further use by dash
     applications.
@@ -212,13 +218,19 @@ class Data:
         for itm in df:
             try:
                 # Specify the condition or remove it:
-                if pd.api.types.is_string_dtype(itm["result_rate_unit"]):
-                    print(pa.Schema.from_pandas(itm))
+                if all((
+                        pd.api.types.is_string_dtype(itm["column_name"]),
+                        pd.api.types.is_string_dtype(itm["telemetry"][0])
+                    )):
+                    schema = pa.Schema.from_pandas(itm)
                     pa.parquet.write_metadata(
-                        pa.Schema.from_pandas(itm),
-                        f"{C.PATH_TO_SCHEMAS}_tmp_schema"
+                        schema, f"{C.PATH_TO_SCHEMAS}_tmp_schema"
                     )
-                    print(itm)
+                    logging.info(schema.to_string(
+                        truncate_metadata=False,
+                        show_field_metadata=True,
+                        show_schema_metadata=True
+                    ))
                     break
             except KeyError:
                 pass
@@ -357,6 +369,18 @@ class Data:
                 time_period = days
             else:
                 time_period = None
+
+            if GENERATE_SCHEMA:
+                # Generate schema:
+                Data._write_parquet_schema(
+                    path=data_set["path"],
+                    partition_filter=partition_filter,
+                    columns=data_set.get("columns", None),
+                    days=time_period
+                )
+                return
+
+            # Read data:
             data = Data._create_dataframe_from_parquet(
                 path=data_set["path"],
                 partition_filter=partition_filter,