feat(telemetry): Add bpf on all testbeds
[csit.git] / csit.infra.etl / local.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2022 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """ETL script running on top of the localhost"""
17
18 from datetime import datetime
19 from json import dump, load
20 from pathlib import Path
21
22 from awsglue.context import GlueContext
23 from pyspark.context import SparkContext
24 from pyspark.sql.functions import col, lit, regexp_replace
25 from pyspark.sql.types import StructType
26
27
28 PATH="/app/tests"
29 SUFFIX="info.json"
30 IGNORE_SUFFIX=[
31     "suite.info.json",
32     "setup.info.json",
33     "teardown.info.json",
34     "suite.output.info.json",
35     "setup.output.info.json",
36     "teardown.output.info.json"
37 ]
38
39
40 def schema_dump(schema, option):
41     """Dumps Spark DataFrame schema into JSON file.
42
43     :param schema: DataFrame schema.
44     :type schema: StructType
45     :param option: File name suffix for the DataFrame schema.
46     :type option: string
47     """
48     with open(f"trending_{option}.json", "w", encoding="UTF-8") as f_schema:
49         dump(schema.jsonValue(), f_schema, indent=4, sort_keys=True)
50
51
52 def schema_load(option):
53     """Loads Spark DataFrame schema from JSON file.
54
55     :param option: File name suffix for the DataFrame schema.
56     :type option: string
57     :returns: DataFrame schema.
58     :rtype: StructType
59     """
60     with open(f"trending_{option}.json", "r", encoding="UTF-8") as f_schema:
61         return StructType.fromJson(load(f_schema))
62
63
64 def schema_dump_from_json(option):
65     """Loads JSON with data and dumps Spark DataFrame schema into JSON file.
66
67     :param option: File name suffix for the JSON data.
68     :type option: string
69     """
70     schema_dump(spark \
71         .read \
72         .option("multiline", "true") \
73         .json(f"data_{option}.json") \
74         .schema, option
75     )
76
77
78 def flatten_frame(nested_sdf):
79     """Unnest Spark DataFrame in case there nested structered columns.
80
81     :param nested_sdf: Spark DataFrame.
82     :type nested_sdf: DataFrame
83     :returns: Unnest DataFrame.
84     :rtype: DataFrame
85     """
86     stack = [((), nested_sdf)]
87     columns = []
88     while len(stack) > 0:
89         parents, sdf = stack.pop()
90         for column_name, column_type in sdf.dtypes:
91             if column_type[:6] == "struct":
92                 projected_sdf = sdf.select(column_name + ".*")
93                 stack.append((parents + (column_name,), projected_sdf))
94             else:
95                 columns.append(
96                     col(".".join(parents + (column_name,))) \
97                         .alias("_".join(parents + (column_name,)))
98                 )
99     return nested_sdf.select(columns)
100
101
102 def process_json_to_dataframe(schema_name, paths):
103     """Processes JSON to Spark DataFrame.
104
105     :param schema_name: Schema name.
106     :type schema_name: string
107     :param paths: S3 paths to process.
108     :type paths: list
109     :returns: Spark DataFrame.
110     :rtype: DataFrame
111     """
112     drop_subset = [
113         "dut_type", "dut_version",
114         "passed",
115         "test_name_long", "test_name_short",
116         "test_type",
117         "version"
118     ]
119
120     # load schemas
121     schema = schema_load(schema_name)
122
123     # create empty DF out of schemas
124     sdf = spark.createDataFrame([], schema)
125
126     # filter list
127     filtered = [path for path in paths if schema_name in path]
128
129     # select
130     for path in filtered:
131         print(path)
132
133         sdf_loaded = spark \
134             .read \
135             .option("multiline", "true") \
136             .schema(schema) \
137             .json(path) \
138             .withColumn("job", lit("local")) \
139             .withColumn("build", lit("unknown"))
140         sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
141
142     # drop rows with all nulls and drop rows with null in critical frames
143     sdf = sdf.na.drop(how="all")
144     sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
145
146     # flatten frame
147     sdf = flatten_frame(sdf)
148
149     return sdf
150
151
152 # create SparkContext and GlueContext
153 spark_context = SparkContext.getOrCreate()
154 spark_context.setLogLevel("WARN")
155 glue_context = GlueContext(spark_context)
156 spark = glue_context.spark_session
157
158 # files of interest
159 paths = []
160 for file in Path(PATH).glob(f"**/*{SUFFIX}"):
161     if file.name not in IGNORE_SUFFIX:
162         paths.append(str(file))
163
164 for schema_name in ["mrr", "ndrpdr", "soak"]:
165     out_sdf = process_json_to_dataframe(schema_name, paths)
166     out_sdf.show()
167     out_sdf.printSchema()
168     out_sdf \
169         .withColumn("year", lit(datetime.now().year)) \
170         .withColumn("month", lit(datetime.now().month)) \
171         .withColumn("day", lit(datetime.now().day)) \
172         .repartition(1) \
173         .write \
174         .partitionBy("test_type", "year", "month", "day") \
175         .mode("append") \
176         .parquet("local.parquet")