UTI: PoC - Dash application for Trending
[csit.git] / csit.infra.etl / coverage_rls2202.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2022 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """ETL script running on top of the s3://"""
17
18 from datetime import datetime, timedelta
19 from json import load
20 from os import environ
21 from pytz import utc
22
23 import awswrangler as wr
24 from awswrangler.exceptions import EmptyDataFrame
25 from awsglue.context import GlueContext
26 from boto3 import session
27 from pyspark.context import SparkContext
28 from pyspark.sql.functions import col, lit, regexp_replace
29 from pyspark.sql.types import StructType
30
31
32 S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
33 S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
34 PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
35 SUFFIX="info.json.gz"
36 IGNORE_SUFFIX=[
37     "suite.info.json.gz",
38     "setup.info.json.gz",
39     "teardown.info.json.gz",
40     "suite.output.info.json.gz",
41     "setup.output.info.json.gz",
42     "teardown.output.info.json.gz"
43 ]
44 LAST_MODIFIED_END=utc.localize(
45     datetime.strptime(
46         f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
47         "%Y-%m-%d"
48     )
49 )
50 LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
51
52
53 def flatten_frame(nested_sdf):
54     """Unnest Spark DataFrame in case there nested structered columns.
55
56     :param nested_sdf: Spark DataFrame.
57     :type nested_sdf: DataFrame
58     :returns: Unnest DataFrame.
59     :rtype: DataFrame
60     """
61     stack = [((), nested_sdf)]
62     columns = []
63     while len(stack) > 0:
64         parents, sdf = stack.pop()
65         for column_name, column_type in sdf.dtypes:
66             if column_type[:6] == "struct":
67                 projected_sdf = sdf.select(column_name + ".*")
68                 stack.append((parents + (column_name,), projected_sdf))
69             else:
70                 columns.append(
71                     col(".".join(parents + (column_name,))) \
72                         .alias("_".join(parents + (column_name,)))
73                 )
74     return nested_sdf.select(columns)
75
76
77 def process_json_to_dataframe(schema_name, paths):
78     """Processes JSON to Spark DataFrame.
79
80     :param schema_name: Schema name.
81     :type schema_name: string
82     :param paths: S3 paths to process.
83     :type paths: list
84     :returns: Spark DataFrame.
85     :rtype: DataFrame
86     """
87     drop_subset = [
88         "dut_type", "dut_version",
89         "passed",
90         "test_name_long", "test_name_short",
91         "test_type",
92         "version"
93     ]
94
95     # load schemas
96     with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
97         schema = StructType.fromJson(load(f_schema))
98
99     # create empty DF out of schemas
100     sdf = spark.createDataFrame([], schema)
101
102     # filter list
103     filtered = [path for path in paths if schema_name in path]
104
105     # select
106     for path in filtered:
107         print(path)
108
109         sdf_loaded = spark \
110             .read \
111             .option("multiline", "true") \
112             .schema(schema) \
113             .json(path) \
114             .withColumn("job", lit(path.split("/")[4])) \
115             .withColumn("build", lit(path.split("/")[5]))
116         sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
117
118     # drop rows with all nulls and drop rows with null in critical frames
119     sdf = sdf.na.drop(how="all")
120     sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
121
122     # flatten frame
123     sdf = flatten_frame(sdf)
124
125     return sdf
126
127
128 # create SparkContext and GlueContext
129 spark_context = SparkContext.getOrCreate()
130 spark_context.setLogLevel("WARN")
131 glue_context = GlueContext(spark_context)
132 spark = glue_context.spark_session
133
134 # files of interest
135 paths = wr.s3.list_objects(
136     path=PATH,
137     suffix=SUFFIX,
138     last_modified_begin=LAST_MODIFIED_BEGIN,
139     last_modified_end=LAST_MODIFIED_END,
140     ignore_suffix=IGNORE_SUFFIX,
141     ignore_empty=True
142 )
143
144 filtered_paths = [path for path in paths if "report-coverage-2202" in path]
145
146 for schema_name in ["mrr", "ndrpdr", "soak", "device"]:
147     out_sdf = process_json_to_dataframe(schema_name, filtered_paths)
148     out_sdf.show(truncate=False)
149     out_sdf.printSchema()
150     out_sdf = out_sdf \
151         .withColumn("year", lit(datetime.now().year)) \
152         .withColumn("month", lit(datetime.now().month)) \
153         .withColumn("day", lit(datetime.now().day)) \
154         .repartition(1)
155
156     try:
157         wr.s3.to_parquet(
158             df=out_sdf.toPandas(),
159             path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2202",
160             dataset=True,
161             partition_cols=["test_type", "year", "month", "day"],
162             compression="snappy",
163             use_threads=True,
164             mode="overwrite_partitions",
165             boto3_session=session.Session(
166                 aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
167                 aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
168                 region_name=environ["OUT_AWS_DEFAULT_REGION"]
169             )
170         )
171     except EmptyDataFrame:
172         pass