3 # Copyright (c) 2022 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 """ETL script running on top of the s3://"""
18 from datetime import datetime, timedelta
20 from os import environ
23 import awswrangler as wr
24 from awswrangler.exceptions import EmptyDataFrame
25 from awsglue.context import GlueContext
26 from boto3 import session
27 from pyspark.context import SparkContext
28 from pyspark.sql.functions import lit
29 from pyspark.sql.types import StructType
31 S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
32 S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
33 PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
34 SUFFIX="suite.info.json.gz"
36 LAST_MODIFIED_END=utc.localize(
38 f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
42 LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
45 def process_json_to_dataframe(schema_name, paths):
46 """Processes JSON to Spark DataFrame.
48 :param schema_name: Schema name.
49 :type schema_name: string
50 :param paths: S3 paths to process.
52 :returns: Spark DataFrame.
61 with open(f"stats_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
62 schema = StructType.fromJson(load(f_schema))
64 # create empty DF out of schemas
65 sdf = spark.createDataFrame([], schema)
68 filtered = [path for path in paths if "tests/suite.info.json.gz" in path]
76 .option("multiline", "true") \
79 .withColumn("job", lit(path.split("/")[4])) \
80 .withColumn("build", lit(path.split("/")[5])) \
81 .withColumn("stats_type", lit(schema_name))
82 sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
84 # drop rows with all nulls and drop rows with null in critical frames
85 sdf = sdf.na.drop(how="all")
86 sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
91 # create SparkContext and GlueContext
92 spark_context = SparkContext.getOrCreate()
93 spark_context.setLogLevel("WARN")
94 glue_context = GlueContext(spark_context)
95 spark = glue_context.spark_session
98 paths = wr.s3.list_objects(
101 last_modified_begin=LAST_MODIFIED_BEGIN,
102 last_modified_end=LAST_MODIFIED_END,
103 ignore_suffix=IGNORE_SUFFIX,
107 for schema_name in ["sra"]:
108 out_sdf = process_json_to_dataframe(schema_name, paths)
109 out_sdf.show(truncate=False)
110 out_sdf.printSchema()
112 .withColumn("year", lit(datetime.now().year)) \
113 .withColumn("month", lit(datetime.now().month)) \
114 .withColumn("day", lit(datetime.now().day)) \
119 df=out_sdf.toPandas(),
120 path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/stats",
122 partition_cols=["stats_type", "year", "month", "day"],
123 compression="snappy",
125 mode="overwrite_partitions",
126 boto3_session=session.Session(
127 aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
128 aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
129 region_name=environ["OUT_AWS_DEFAULT_REGION"]
132 except EmptyDataFrame: