d7b4c23edd09612186715cf3ab9e5586046ad5d2
[csit.git] / csit.infra.etl / stats.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2024 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """ETL script running on top of the s3://"""
17
18 from datetime import datetime, timedelta
19 from json import load
20 from os import environ
21 from pytz import utc
22
23 import awswrangler as wr
24 from awswrangler.exceptions import EmptyDataFrame
25 from awsglue.context import GlueContext
26 from boto3 import session
27 from pyspark.context import SparkContext
28 from pyspark.sql.functions import lit
29 from pyspark.sql.types import StructType
30
31
32 S3_LOGS_BUCKET=environ.get("S3_LOGS_BUCKET", "fdio-logs-s3-cloudfront-index")
33 S3_DOCS_BUCKET=environ.get("S3_DOCS_BUCKET", "fdio-docs-s3-cloudfront-index")
34 PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
35 SUFFIX="suite.info.json.gz"
36 IGNORE_SUFFIX=[]
37 LAST_MODIFIED_END=utc.localize(
38     datetime.strptime(
39         f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
40         "%Y-%m-%d"
41     )
42 )
43 LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
44
45
46 def process_json_to_dataframe(schema_name, paths):
47     """Processes JSON to Spark DataFrame.
48
49     :param schema_name: Schema name.
50     :type schema_name: string
51     :param paths: S3 paths to process.
52     :type paths: list
53     :returns: Spark DataFrame.
54     :rtype: DataFrame
55     """
56     drop_subset = [
57         "duration",
58         "version"
59     ]
60
61     # load schemas
62     with open(f"stats_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
63         schema = StructType.fromJson(load(f_schema))
64
65     # create empty DF out of schemas
66     sdf = spark.createDataFrame([], schema)
67
68     # filter list
69     filtered = [path for path in paths if "tests/suite.info.json.gz" in path]
70
71     # select
72     for path in filtered:
73         print(path)
74
75         sdf_loaded = spark \
76             .read \
77             .option("multiline", "true") \
78             .schema(schema) \
79             .json(path) \
80             .withColumn("job", lit(path.split("/")[4])) \
81             .withColumn("build", lit(path.split("/")[5])) \
82             .withColumn("stats_type", lit(schema_name))
83         sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
84
85     # drop rows with all nulls and drop rows with null in critical frames
86     sdf = sdf.na.drop(how="all")
87     sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
88
89     return sdf
90
91
92 # create SparkContext and GlueContext
93 spark_context = SparkContext.getOrCreate()
94 spark_context.setLogLevel("WARN")
95 glue_context = GlueContext(spark_context)
96 spark = glue_context.spark_session
97
98 # files of interest
99 paths = wr.s3.list_objects(
100     path=PATH,
101     suffix=SUFFIX,
102     last_modified_begin=LAST_MODIFIED_BEGIN,
103     last_modified_end=LAST_MODIFIED_END,
104     ignore_suffix=IGNORE_SUFFIX,
105     ignore_empty=True
106 )
107
108 for schema_name in ["sra"]:
109     out_sdf = process_json_to_dataframe(schema_name, paths)
110     out_sdf.printSchema()
111     out_sdf = out_sdf \
112         .withColumn("year", lit(datetime.now().year)) \
113         .withColumn("month", lit(datetime.now().month)) \
114         .withColumn("day", lit(datetime.now().day)) \
115         .repartition(1)
116
117     try:
118         boto3_session = session.Session(
119             aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
120             aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
121             region_name=environ["OUT_AWS_DEFAULT_REGION"]
122         )
123     except KeyError:
124         boto3_session = session.Session()
125
126     try:
127         wr.s3.to_parquet(
128             df=out_sdf.toPandas(),
129             path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/stats",
130             dataset=True,
131             partition_cols=["stats_type", "year", "month", "day"],
132             compression="snappy",
133             use_threads=True,
134             mode="overwrite_partitions",
135             boto3_session=boto3_session
136         )
137     except EmptyDataFrame:
138         pass