3 # Copyright (c) 2023 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 """ETL script running on top of the localhost"""
18 from datetime import datetime
19 from json import dump, load
20 from pathlib import Path
22 from awsglue.context import GlueContext
23 from pyspark.context import SparkContext
24 from pyspark.sql.functions import col, lit, regexp_replace
25 from pyspark.sql.types import StructType
34 "suite.output.info.json",
35 "setup.output.info.json",
36 "teardown.output.info.json"
40 def schema_dump(schema, option):
41 """Dumps Spark DataFrame schema into JSON file.
43 :param schema: DataFrame schema.
44 :type schema: StructType
45 :param option: File name suffix for the DataFrame schema.
48 with open(f"trending_{option}.json", "w", encoding="UTF-8") as f_schema:
49 dump(schema.jsonValue(), f_schema, indent=4, sort_keys=True)
52 def schema_load(option):
53 """Loads Spark DataFrame schema from JSON file.
55 :param option: File name suffix for the DataFrame schema.
57 :returns: DataFrame schema.
60 with open(f"trending_{option}.json", "r", encoding="UTF-8") as f_schema:
61 return StructType.fromJson(load(f_schema))
64 def schema_dump_from_json(option):
65 """Loads JSON with data and dumps Spark DataFrame schema into JSON file.
67 :param option: File name suffix for the JSON data.
72 .option("multiline", "true") \
73 .json(f"data_{option}.json") \
78 def flatten_frame(nested_sdf):
79 """Unnest Spark DataFrame in case there nested structered columns.
81 :param nested_sdf: Spark DataFrame.
82 :type nested_sdf: DataFrame
83 :returns: Unnest DataFrame.
86 stack = [((), nested_sdf)]
89 parents, sdf = stack.pop()
90 for column_name, column_type in sdf.dtypes:
91 if column_type[:6] == "struct":
92 projected_sdf = sdf.select(column_name + ".*")
93 stack.append((parents + (column_name,), projected_sdf))
96 col(".".join(parents + (column_name,))) \
97 .alias("_".join(parents + (column_name,)))
99 return nested_sdf.select(columns)
102 def process_json_to_dataframe(schema_name, paths):
103 """Processes JSON to Spark DataFrame.
105 :param schema_name: Schema name.
106 :type schema_name: string
107 :param paths: S3 paths to process.
109 :returns: Spark DataFrame.
113 "dut_type", "dut_version",
115 "test_name_long", "test_name_short",
121 schema = schema_load(schema_name)
123 # create empty DF out of schemas
124 sdf = spark.createDataFrame([], schema)
127 filtered = [path for path in paths if schema_name in path]
130 for path in filtered:
135 .option("multiline", "true") \
138 .withColumn("job", lit("local")) \
139 .withColumn("build", lit("unknown"))
140 sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
142 # drop rows with all nulls and drop rows with null in critical frames
143 sdf = sdf.na.drop(how="all")
144 sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
147 sdf = flatten_frame(sdf)
152 # create SparkContext and GlueContext
153 spark_context = SparkContext.getOrCreate()
154 spark_context.setLogLevel("WARN")
155 glue_context = GlueContext(spark_context)
156 spark = glue_context.spark_session
160 for file in Path(PATH).glob(f"**/*{SUFFIX}"):
161 if file.name not in IGNORE_SUFFIX:
162 paths.append(str(file))
164 for schema_name in ["mrr", "ndrpdr", "soak"]:
165 out_sdf = process_json_to_dataframe(schema_name, paths)
167 out_sdf.printSchema()
169 .withColumn("year", lit(datetime.now().year)) \
170 .withColumn("month", lit(datetime.now().month)) \
171 .withColumn("day", lit(datetime.now().day)) \
174 .partitionBy("test_type", "year", "month", "day") \
176 .parquet("local.parquet")