feat(uti): etl
[csit.git] / csit.infra.etl / stats.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2022 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """ETL script running on top of the s3://"""
17
18 from datetime import datetime, timedelta
19 from json import load
20 from os import environ
21 from pytz import utc
22
23 import awswrangler as wr
24 from awswrangler.exceptions import EmptyDataFrame
25 from awsglue.context import GlueContext
26 from boto3 import session
27 from pyspark.context import SparkContext
28 from pyspark.sql.functions import lit
29 from pyspark.sql.types import StructType
30
31 S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
32 S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
33 PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
34 SUFFIX="suite.info.json.gz"
35 IGNORE_SUFFIX=[]
36 LAST_MODIFIED_END=utc.localize(
37     datetime.strptime(
38         f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
39         "%Y-%m-%d"
40     )
41 )
42 LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
43
44
45 def process_json_to_dataframe(schema_name, paths):
46     """Processes JSON to Spark DataFrame.
47
48     :param schema_name: Schema name.
49     :type schema_name: string
50     :param paths: S3 paths to process.
51     :type paths: list
52     :returns: Spark DataFrame.
53     :rtype: DataFrame
54     """
55     drop_subset = [
56         "duration",
57         "version"
58     ]
59
60     # load schemas
61     with open(f"stats_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
62         schema = StructType.fromJson(load(f_schema))
63
64     # create empty DF out of schemas
65     sdf = spark.createDataFrame([], schema)
66
67     # filter list
68     filtered = [path for path in paths if "tests/suite.info.json.gz" in path]
69
70     # select
71     for path in filtered:
72         print(path)
73
74         sdf_loaded = spark \
75             .read \
76             .option("multiline", "true") \
77             .schema(schema) \
78             .json(path) \
79             .withColumn("job", lit(path.split("/")[4])) \
80             .withColumn("build", lit(path.split("/")[5])) \
81             .withColumn("stats_type", lit(schema_name))
82         sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
83
84     # drop rows with all nulls and drop rows with null in critical frames
85     sdf = sdf.na.drop(how="all")
86     sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
87
88     return sdf
89
90
91 # create SparkContext and GlueContext
92 spark_context = SparkContext.getOrCreate()
93 spark_context.setLogLevel("WARN")
94 glue_context = GlueContext(spark_context)
95 spark = glue_context.spark_session
96
97 # files of interest
98 paths = wr.s3.list_objects(
99     path=PATH,
100     suffix=SUFFIX,
101     last_modified_begin=LAST_MODIFIED_BEGIN,
102     last_modified_end=LAST_MODIFIED_END,
103     ignore_suffix=IGNORE_SUFFIX,
104     ignore_empty=True
105 )
106
107 for schema_name in ["sra"]:
108     out_sdf = process_json_to_dataframe(schema_name, paths)
109     out_sdf.show(truncate=False)
110     out_sdf.printSchema()
111     out_sdf = out_sdf \
112         .withColumn("year", lit(datetime.now().year)) \
113         .withColumn("month", lit(datetime.now().month)) \
114         .withColumn("day", lit(datetime.now().day)) \
115         .repartition(1)
116
117     try:
118         wr.s3.to_parquet(
119             df=out_sdf.toPandas(),
120             path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/stats",
121             dataset=True,
122             partition_cols=["stats_type", "year", "month", "day"],
123             compression="snappy",
124             use_threads=True,
125             mode="overwrite_partitions",
126             boto3_session=session.Session(
127                 aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
128                 aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
129                 region_name=environ["OUT_AWS_DEFAULT_REGION"]
130             )
131         )
132     except EmptyDataFrame:
133         pass