3 # Copyright (c) 2024 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 """ETL script running on top of the s3://"""
18 from datetime import datetime, timedelta
20 from os import environ
23 import awswrangler as wr
24 from awswrangler.exceptions import EmptyDataFrame
25 from awsglue.context import GlueContext
26 from boto3 import session
27 from pyspark.context import SparkContext
28 from pyspark.sql.functions import lit
29 from pyspark.sql.types import StructType
32 S3_LOGS_BUCKET=environ.get("S3_LOGS_BUCKET", "fdio-logs-s3-cloudfront-index")
33 S3_DOCS_BUCKET=environ.get("S3_DOCS_BUCKET", "fdio-docs-s3-cloudfront-index")
34 PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
35 SUFFIX="suite.info.json.gz"
37 LAST_MODIFIED_END=utc.localize(
39 f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
43 LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
46 def process_json_to_dataframe(schema_name, paths):
47 """Processes JSON to Spark DataFrame.
49 :param schema_name: Schema name.
50 :type schema_name: string
51 :param paths: S3 paths to process.
53 :returns: Spark DataFrame.
62 with open(f"stats_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
63 schema = StructType.fromJson(load(f_schema))
65 # create empty DF out of schemas
66 sdf = spark.createDataFrame([], schema)
69 filtered = [path for path in paths if "tests/suite.info.json.gz" in path]
77 .option("multiline", "true") \
80 .withColumn("job", lit(path.split("/")[4])) \
81 .withColumn("build", lit(path.split("/")[5])) \
82 .withColumn("stats_type", lit(schema_name))
83 sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
85 # drop rows with all nulls and drop rows with null in critical frames
86 sdf = sdf.na.drop(how="all")
87 sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
92 # create SparkContext and GlueContext
93 spark_context = SparkContext.getOrCreate()
94 spark_context.setLogLevel("WARN")
95 glue_context = GlueContext(spark_context)
96 spark = glue_context.spark_session
99 paths = wr.s3.list_objects(
102 last_modified_begin=LAST_MODIFIED_BEGIN,
103 last_modified_end=LAST_MODIFIED_END,
104 ignore_suffix=IGNORE_SUFFIX,
108 for schema_name in ["sra"]:
109 out_sdf = process_json_to_dataframe(schema_name, paths)
110 out_sdf.printSchema()
112 .withColumn("year", lit(datetime.now().year)) \
113 .withColumn("month", lit(datetime.now().month)) \
114 .withColumn("day", lit(datetime.now().day)) \
118 boto3_session = session.Session(
119 aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
120 aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
121 region_name=environ["OUT_AWS_DEFAULT_REGION"]
124 boto3_session = session.Session()
128 df=out_sdf.toPandas(),
129 path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/stats",
131 partition_cols=["stats_type", "year", "month", "day"],
132 compression="snappy",
134 mode="overwrite_partitions",
135 boto3_session=boto3_session
137 except EmptyDataFrame: