3 # Copyright (c) 2022 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 """ETL script running on top of the s3://"""
18 from datetime import datetime, timedelta
20 from os import environ
23 import awswrangler as wr
24 from awswrangler.exceptions import EmptyDataFrame
25 from awsglue.context import GlueContext
26 from boto3 import session
27 from pyspark.context import SparkContext
28 from pyspark.sql.functions import col, lit, regexp_replace
29 from pyspark.sql.types import StructType
32 S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
33 S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
34 PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
39 "teardown.info.json.gz",
40 "suite.output.info.json.gz",
41 "setup.output.info.json.gz",
42 "teardown.output.info.json.gz"
44 LAST_MODIFIED_END=utc.localize(
46 f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
50 LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
53 def flatten_frame(nested_sdf):
54 """Unnest Spark DataFrame in case there nested structered columns.
56 :param nested_sdf: Spark DataFrame.
57 :type nested_sdf: DataFrame
58 :returns: Unnest DataFrame.
61 stack = [((), nested_sdf)]
64 parents, sdf = stack.pop()
65 for column_name, column_type in sdf.dtypes:
66 if column_type[:6] == "struct":
67 projected_sdf = sdf.select(column_name + ".*")
68 stack.append((parents + (column_name,), projected_sdf))
71 col(".".join(parents + (column_name,))) \
72 .alias("_".join(parents + (column_name,)))
74 return nested_sdf.select(columns)
77 def process_json_to_dataframe(schema_name, paths):
78 """Processes JSON to Spark DataFrame.
80 :param schema_name: Schema name.
81 :type schema_name: string
82 :param paths: S3 paths to process.
84 :returns: Spark DataFrame.
88 "dut_type", "dut_version",
90 "test_name_long", "test_name_short",
96 with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
97 schema = StructType.fromJson(load(f_schema))
99 # create empty DF out of schemas
100 sdf = spark.createDataFrame([], schema)
103 filtered = [path for path in paths if schema_name in path]
106 for path in filtered:
111 .option("multiline", "true") \
114 .withColumn("job", lit(path.split("/")[4])) \
115 .withColumn("build", lit(path.split("/")[5]))
116 sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
118 # drop rows with all nulls and drop rows with null in critical frames
119 sdf = sdf.na.drop(how="all")
120 sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
123 sdf = flatten_frame(sdf)
128 # create SparkContext and GlueContext
129 spark_context = SparkContext.getOrCreate()
130 spark_context.setLogLevel("WARN")
131 glue_context = GlueContext(spark_context)
132 spark = glue_context.spark_session
135 paths = wr.s3.list_objects(
138 last_modified_begin=LAST_MODIFIED_BEGIN,
139 last_modified_end=LAST_MODIFIED_END,
140 ignore_suffix=IGNORE_SUFFIX,
144 filtered_paths = [path for path in paths if "report-coverage-2206" in path]
146 for schema_name in ["mrr", "ndrpdr", "soak", "device"]:
147 out_sdf = process_json_to_dataframe(schema_name, filtered_paths)
148 out_sdf.printSchema()
150 .withColumn("year", lit(datetime.now().year)) \
151 .withColumn("month", lit(datetime.now().month)) \
152 .withColumn("day", lit(datetime.now().day)) \
157 df=out_sdf.toPandas(),
158 path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2206",
160 partition_cols=["test_type", "year", "month", "day"],
161 compression="snappy",
163 mode="overwrite_partitions",
164 boto3_session=session.Session(
165 aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
166 aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
167 region_name=environ["OUT_AWS_DEFAULT_REGION"]
170 except EmptyDataFrame: