feat(etl): Add rls2210 62/37262/3
authorpmikus <peter.mikus@protonmail.ch>
Tue, 27 Sep 2022 12:23:42 +0000 (14:23 +0200)
committerPeter Mikus <peter.mikus@protonmail.ch>
Wed, 28 Sep 2022 08:59:36 +0000 (08:59 +0000)
Signed-off-by: pmikus <peter.mikus@protonmail.ch>
Change-Id: Icda348f7381255deb27b1ada69fcb9fbd4ead600

csit.infra.etl/coverage_device_rls2210.py [moved from csit.infra.etl/coverage_rls2206.py with 79% similarity]
csit.infra.etl/coverage_mrr_rls2210.py [new file with mode: 0644]
csit.infra.etl/coverage_ndrpdr_rls2210.py [new file with mode: 0644]
csit.infra.etl/coverage_soak_rls2210.py [new file with mode: 0644]
csit.infra.etl/iterative_mrr_rls2210.py [moved from csit.infra.etl/iterative_rls2206.py with 81% similarity]
csit.infra.etl/iterative_ndrpdr_rls2210.py [new file with mode: 0644]
csit.infra.etl/iterative_soak_rls2210.py [new file with mode: 0644]
csit.infra.etl/trending_mrr.py [moved from csit.infra.etl/trending.py with 81% similarity]
csit.infra.etl/trending_ndrpdr.py [new file with mode: 0644]
csit.infra.etl/trending_soak.py [new file with mode: 0644]
fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl.hcl.tftpl

similarity index 79%
rename from csit.infra.etl/coverage_rls2206.py
rename to csit.infra.etl/coverage_device_rls2210.py
index 4e2619d..9c9e1c9 100644 (file)
@@ -141,31 +141,30 @@ paths = wr.s3.list_objects(
     ignore_empty=True
 )
 
     ignore_empty=True
 )
 
-filtered_paths = [path for path in paths if "report-coverage-2206" in path]
-
-for schema_name in ["mrr", "ndrpdr", "soak", "device"]:
-    out_sdf = process_json_to_dataframe(schema_name, filtered_paths)
-    out_sdf.printSchema()
-    out_sdf = out_sdf \
-        .withColumn("year", lit(datetime.now().year)) \
-        .withColumn("month", lit(datetime.now().month)) \
-        .withColumn("day", lit(datetime.now().day)) \
-        .repartition(1)
-
-    try:
-        wr.s3.to_parquet(
-            df=out_sdf.toPandas(),
-            path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2206",
-            dataset=True,
-            partition_cols=["test_type", "year", "month", "day"],
-            compression="snappy",
-            use_threads=True,
-            mode="overwrite_partitions",
-            boto3_session=session.Session(
-                aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
-                aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
-                region_name=environ["OUT_AWS_DEFAULT_REGION"]
-            )
+filtered_paths = [path for path in paths if "report-coverage-2210" in path]
+
+out_sdf = process_json_to_dataframe("mrr", filtered_paths)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+    .withColumn("year", lit(datetime.now().year)) \
+    .withColumn("month", lit(datetime.now().month)) \
+    .withColumn("day", lit(datetime.now().day)) \
+    .repartition(1)
+
+try:
+    wr.s3.to_parquet(
+        df=out_sdf.toPandas(),
+        path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2210",
+        dataset=True,
+        partition_cols=["test_type", "year", "month", "day"],
+        compression="snappy",
+        use_threads=True,
+        mode="overwrite_partitions",
+        boto3_session=session.Session(
+            aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+            aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+            region_name=environ["OUT_AWS_DEFAULT_REGION"]
         )
         )
-    except EmptyDataFrame:
-        pass
+    )
+except EmptyDataFrame:
+    pass
diff --git a/csit.infra.etl/coverage_mrr_rls2210.py b/csit.infra.etl/coverage_mrr_rls2210.py
new file mode 100644 (file)
index 0000000..9c9e1c9
--- /dev/null
@@ -0,0 +1,170 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2022 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""ETL script running on top of the s3://"""
+
+from datetime import datetime, timedelta
+from json import load
+from os import environ
+from pytz import utc
+
+import awswrangler as wr
+from awswrangler.exceptions import EmptyDataFrame
+from awsglue.context import GlueContext
+from boto3 import session
+from pyspark.context import SparkContext
+from pyspark.sql.functions import col, lit, regexp_replace
+from pyspark.sql.types import StructType
+
+
+S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
+S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
+PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
+SUFFIX="info.json.gz"
+IGNORE_SUFFIX=[
+    "suite.info.json.gz",
+    "setup.info.json.gz",
+    "teardown.info.json.gz",
+    "suite.output.info.json.gz",
+    "setup.output.info.json.gz",
+    "teardown.output.info.json.gz"
+]
+LAST_MODIFIED_END=utc.localize(
+    datetime.strptime(
+        f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
+        "%Y-%m-%d"
+    )
+)
+LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
+
+
+def flatten_frame(nested_sdf):
+    """Unnest Spark DataFrame in case there nested structered columns.
+
+    :param nested_sdf: Spark DataFrame.
+    :type nested_sdf: DataFrame
+    :returns: Unnest DataFrame.
+    :rtype: DataFrame
+    """
+    stack = [((), nested_sdf)]
+    columns = []
+    while len(stack) > 0:
+        parents, sdf = stack.pop()
+        for column_name, column_type in sdf.dtypes:
+            if column_type[:6] == "struct":
+                projected_sdf = sdf.select(column_name + ".*")
+                stack.append((parents + (column_name,), projected_sdf))
+            else:
+                columns.append(
+                    col(".".join(parents + (column_name,))) \
+                        .alias("_".join(parents + (column_name,)))
+                )
+    return nested_sdf.select(columns)
+
+
+def process_json_to_dataframe(schema_name, paths):
+    """Processes JSON to Spark DataFrame.
+
+    :param schema_name: Schema name.
+    :type schema_name: string
+    :param paths: S3 paths to process.
+    :type paths: list
+    :returns: Spark DataFrame.
+    :rtype: DataFrame
+    """
+    drop_subset = [
+        "dut_type", "dut_version",
+        "passed",
+        "test_name_long", "test_name_short",
+        "test_type",
+        "version"
+    ]
+
+    # load schemas
+    with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
+        schema = StructType.fromJson(load(f_schema))
+
+    # create empty DF out of schemas
+    sdf = spark.createDataFrame([], schema)
+
+    # filter list
+    filtered = [path for path in paths if schema_name in path]
+
+    # select
+    for path in filtered:
+        print(path)
+
+        sdf_loaded = spark \
+            .read \
+            .option("multiline", "true") \
+            .schema(schema) \
+            .json(path) \
+            .withColumn("job", lit(path.split("/")[4])) \
+            .withColumn("build", lit(path.split("/")[5]))
+        sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
+
+    # drop rows with all nulls and drop rows with null in critical frames
+    sdf = sdf.na.drop(how="all")
+    sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
+
+    # flatten frame
+    sdf = flatten_frame(sdf)
+
+    return sdf
+
+
+# create SparkContext and GlueContext
+spark_context = SparkContext.getOrCreate()
+spark_context.setLogLevel("WARN")
+glue_context = GlueContext(spark_context)
+spark = glue_context.spark_session
+
+# files of interest
+paths = wr.s3.list_objects(
+    path=PATH,
+    suffix=SUFFIX,
+    last_modified_begin=LAST_MODIFIED_BEGIN,
+    last_modified_end=LAST_MODIFIED_END,
+    ignore_suffix=IGNORE_SUFFIX,
+    ignore_empty=True
+)
+
+filtered_paths = [path for path in paths if "report-coverage-2210" in path]
+
+out_sdf = process_json_to_dataframe("mrr", filtered_paths)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+    .withColumn("year", lit(datetime.now().year)) \
+    .withColumn("month", lit(datetime.now().month)) \
+    .withColumn("day", lit(datetime.now().day)) \
+    .repartition(1)
+
+try:
+    wr.s3.to_parquet(
+        df=out_sdf.toPandas(),
+        path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2210",
+        dataset=True,
+        partition_cols=["test_type", "year", "month", "day"],
+        compression="snappy",
+        use_threads=True,
+        mode="overwrite_partitions",
+        boto3_session=session.Session(
+            aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+            aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+            region_name=environ["OUT_AWS_DEFAULT_REGION"]
+        )
+    )
+except EmptyDataFrame:
+    pass
diff --git a/csit.infra.etl/coverage_ndrpdr_rls2210.py b/csit.infra.etl/coverage_ndrpdr_rls2210.py
new file mode 100644 (file)
index 0000000..9c9e1c9
--- /dev/null
@@ -0,0 +1,170 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2022 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""ETL script running on top of the s3://"""
+
+from datetime import datetime, timedelta
+from json import load
+from os import environ
+from pytz import utc
+
+import awswrangler as wr
+from awswrangler.exceptions import EmptyDataFrame
+from awsglue.context import GlueContext
+from boto3 import session
+from pyspark.context import SparkContext
+from pyspark.sql.functions import col, lit, regexp_replace
+from pyspark.sql.types import StructType
+
+
+S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
+S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
+PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
+SUFFIX="info.json.gz"
+IGNORE_SUFFIX=[
+    "suite.info.json.gz",
+    "setup.info.json.gz",
+    "teardown.info.json.gz",
+    "suite.output.info.json.gz",
+    "setup.output.info.json.gz",
+    "teardown.output.info.json.gz"
+]
+LAST_MODIFIED_END=utc.localize(
+    datetime.strptime(
+        f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
+        "%Y-%m-%d"
+    )
+)
+LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
+
+
+def flatten_frame(nested_sdf):
+    """Unnest Spark DataFrame in case there nested structered columns.
+
+    :param nested_sdf: Spark DataFrame.
+    :type nested_sdf: DataFrame
+    :returns: Unnest DataFrame.
+    :rtype: DataFrame
+    """
+    stack = [((), nested_sdf)]
+    columns = []
+    while len(stack) > 0:
+        parents, sdf = stack.pop()
+        for column_name, column_type in sdf.dtypes:
+            if column_type[:6] == "struct":
+                projected_sdf = sdf.select(column_name + ".*")
+                stack.append((parents + (column_name,), projected_sdf))
+            else:
+                columns.append(
+                    col(".".join(parents + (column_name,))) \
+                        .alias("_".join(parents + (column_name,)))
+                )
+    return nested_sdf.select(columns)
+
+
+def process_json_to_dataframe(schema_name, paths):
+    """Processes JSON to Spark DataFrame.
+
+    :param schema_name: Schema name.
+    :type schema_name: string
+    :param paths: S3 paths to process.
+    :type paths: list
+    :returns: Spark DataFrame.
+    :rtype: DataFrame
+    """
+    drop_subset = [
+        "dut_type", "dut_version",
+        "passed",
+        "test_name_long", "test_name_short",
+        "test_type",
+        "version"
+    ]
+
+    # load schemas
+    with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
+        schema = StructType.fromJson(load(f_schema))
+
+    # create empty DF out of schemas
+    sdf = spark.createDataFrame([], schema)
+
+    # filter list
+    filtered = [path for path in paths if schema_name in path]
+
+    # select
+    for path in filtered:
+        print(path)
+
+        sdf_loaded = spark \
+            .read \
+            .option("multiline", "true") \
+            .schema(schema) \
+            .json(path) \
+            .withColumn("job", lit(path.split("/")[4])) \
+            .withColumn("build", lit(path.split("/")[5]))
+        sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
+
+    # drop rows with all nulls and drop rows with null in critical frames
+    sdf = sdf.na.drop(how="all")
+    sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
+
+    # flatten frame
+    sdf = flatten_frame(sdf)
+
+    return sdf
+
+
+# create SparkContext and GlueContext
+spark_context = SparkContext.getOrCreate()
+spark_context.setLogLevel("WARN")
+glue_context = GlueContext(spark_context)
+spark = glue_context.spark_session
+
+# files of interest
+paths = wr.s3.list_objects(
+    path=PATH,
+    suffix=SUFFIX,
+    last_modified_begin=LAST_MODIFIED_BEGIN,
+    last_modified_end=LAST_MODIFIED_END,
+    ignore_suffix=IGNORE_SUFFIX,
+    ignore_empty=True
+)
+
+filtered_paths = [path for path in paths if "report-coverage-2210" in path]
+
+out_sdf = process_json_to_dataframe("mrr", filtered_paths)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+    .withColumn("year", lit(datetime.now().year)) \
+    .withColumn("month", lit(datetime.now().month)) \
+    .withColumn("day", lit(datetime.now().day)) \
+    .repartition(1)
+
+try:
+    wr.s3.to_parquet(
+        df=out_sdf.toPandas(),
+        path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2210",
+        dataset=True,
+        partition_cols=["test_type", "year", "month", "day"],
+        compression="snappy",
+        use_threads=True,
+        mode="overwrite_partitions",
+        boto3_session=session.Session(
+            aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+            aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+            region_name=environ["OUT_AWS_DEFAULT_REGION"]
+        )
+    )
+except EmptyDataFrame:
+    pass
diff --git a/csit.infra.etl/coverage_soak_rls2210.py b/csit.infra.etl/coverage_soak_rls2210.py
new file mode 100644 (file)
index 0000000..9c9e1c9
--- /dev/null
@@ -0,0 +1,170 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2022 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""ETL script running on top of the s3://"""
+
+from datetime import datetime, timedelta
+from json import load
+from os import environ
+from pytz import utc
+
+import awswrangler as wr
+from awswrangler.exceptions import EmptyDataFrame
+from awsglue.context import GlueContext
+from boto3 import session
+from pyspark.context import SparkContext
+from pyspark.sql.functions import col, lit, regexp_replace
+from pyspark.sql.types import StructType
+
+
+S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
+S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
+PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
+SUFFIX="info.json.gz"
+IGNORE_SUFFIX=[
+    "suite.info.json.gz",
+    "setup.info.json.gz",
+    "teardown.info.json.gz",
+    "suite.output.info.json.gz",
+    "setup.output.info.json.gz",
+    "teardown.output.info.json.gz"
+]
+LAST_MODIFIED_END=utc.localize(
+    datetime.strptime(
+        f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
+        "%Y-%m-%d"
+    )
+)
+LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
+
+
+def flatten_frame(nested_sdf):
+    """Unnest Spark DataFrame in case there nested structered columns.
+
+    :param nested_sdf: Spark DataFrame.
+    :type nested_sdf: DataFrame
+    :returns: Unnest DataFrame.
+    :rtype: DataFrame
+    """
+    stack = [((), nested_sdf)]
+    columns = []
+    while len(stack) > 0:
+        parents, sdf = stack.pop()
+        for column_name, column_type in sdf.dtypes:
+            if column_type[:6] == "struct":
+                projected_sdf = sdf.select(column_name + ".*")
+                stack.append((parents + (column_name,), projected_sdf))
+            else:
+                columns.append(
+                    col(".".join(parents + (column_name,))) \
+                        .alias("_".join(parents + (column_name,)))
+                )
+    return nested_sdf.select(columns)
+
+
+def process_json_to_dataframe(schema_name, paths):
+    """Processes JSON to Spark DataFrame.
+
+    :param schema_name: Schema name.
+    :type schema_name: string
+    :param paths: S3 paths to process.
+    :type paths: list
+    :returns: Spark DataFrame.
+    :rtype: DataFrame
+    """
+    drop_subset = [
+        "dut_type", "dut_version",
+        "passed",
+        "test_name_long", "test_name_short",
+        "test_type",
+        "version"
+    ]
+
+    # load schemas
+    with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
+        schema = StructType.fromJson(load(f_schema))
+
+    # create empty DF out of schemas
+    sdf = spark.createDataFrame([], schema)
+
+    # filter list
+    filtered = [path for path in paths if schema_name in path]
+
+    # select
+    for path in filtered:
+        print(path)
+
+        sdf_loaded = spark \
+            .read \
+            .option("multiline", "true") \
+            .schema(schema) \
+            .json(path) \
+            .withColumn("job", lit(path.split("/")[4])) \
+            .withColumn("build", lit(path.split("/")[5]))
+        sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
+
+    # drop rows with all nulls and drop rows with null in critical frames
+    sdf = sdf.na.drop(how="all")
+    sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
+
+    # flatten frame
+    sdf = flatten_frame(sdf)
+
+    return sdf
+
+
+# create SparkContext and GlueContext
+spark_context = SparkContext.getOrCreate()
+spark_context.setLogLevel("WARN")
+glue_context = GlueContext(spark_context)
+spark = glue_context.spark_session
+
+# files of interest
+paths = wr.s3.list_objects(
+    path=PATH,
+    suffix=SUFFIX,
+    last_modified_begin=LAST_MODIFIED_BEGIN,
+    last_modified_end=LAST_MODIFIED_END,
+    ignore_suffix=IGNORE_SUFFIX,
+    ignore_empty=True
+)
+
+filtered_paths = [path for path in paths if "report-coverage-2210" in path]
+
+out_sdf = process_json_to_dataframe("mrr", filtered_paths)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+    .withColumn("year", lit(datetime.now().year)) \
+    .withColumn("month", lit(datetime.now().month)) \
+    .withColumn("day", lit(datetime.now().day)) \
+    .repartition(1)
+
+try:
+    wr.s3.to_parquet(
+        df=out_sdf.toPandas(),
+        path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2210",
+        dataset=True,
+        partition_cols=["test_type", "year", "month", "day"],
+        compression="snappy",
+        use_threads=True,
+        mode="overwrite_partitions",
+        boto3_session=session.Session(
+            aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+            aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+            region_name=environ["OUT_AWS_DEFAULT_REGION"]
+        )
+    )
+except EmptyDataFrame:
+    pass
similarity index 81%
rename from csit.infra.etl/iterative_rls2206.py
rename to csit.infra.etl/iterative_mrr_rls2210.py
index 88c644b..b7a8dbc 100644 (file)
@@ -141,31 +141,30 @@ paths = wr.s3.list_objects(
     ignore_empty=True
 )
 
     ignore_empty=True
 )
 
-filtered_paths = [path for path in paths if "report-iterative-2206" in path]
-
-for schema_name in ["mrr", "ndrpdr", "soak"]:
-    out_sdf = process_json_to_dataframe(schema_name, filtered_paths)
-    out_sdf.printSchema()
-    out_sdf = out_sdf \
-        .withColumn("year", lit(datetime.now().year)) \
-        .withColumn("month", lit(datetime.now().month)) \
-        .withColumn("day", lit(datetime.now().day)) \
-        .repartition(1)
-
-    try:
-        wr.s3.to_parquet(
-            df=out_sdf.toPandas(),
-            path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2206",
-            dataset=True,
-            partition_cols=["test_type", "year", "month", "day"],
-            compression="snappy",
-            use_threads=True,
-            mode="overwrite_partitions",
-            boto3_session=session.Session(
-                aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
-                aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
-                region_name=environ["OUT_AWS_DEFAULT_REGION"]
-            )
+filtered_paths = [path for path in paths if "report-iterative-2210" in path]
+
+out_sdf = process_json_to_dataframe("mrr", filtered_paths)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+    .withColumn("year", lit(datetime.now().year)) \
+    .withColumn("month", lit(datetime.now().month)) \
+    .withColumn("day", lit(datetime.now().day)) \
+    .repartition(1)
+
+try:
+    wr.s3.to_parquet(
+        df=out_sdf.toPandas(),
+        path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2210",
+        dataset=True,
+        partition_cols=["test_type", "year", "month", "day"],
+        compression="snappy",
+        use_threads=True,
+        mode="overwrite_partitions",
+        boto3_session=session.Session(
+            aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+            aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+            region_name=environ["OUT_AWS_DEFAULT_REGION"]
         )
         )
-    except EmptyDataFrame:
-        pass
+    )
+except EmptyDataFrame:
+    pass
diff --git a/csit.infra.etl/iterative_ndrpdr_rls2210.py b/csit.infra.etl/iterative_ndrpdr_rls2210.py
new file mode 100644 (file)
index 0000000..70ab815
--- /dev/null
@@ -0,0 +1,170 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2022 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""ETL script running on top of the s3://"""
+
+from datetime import datetime, timedelta
+from json import load
+from os import environ
+from pytz import utc
+
+import awswrangler as wr
+from awswrangler.exceptions import EmptyDataFrame
+from awsglue.context import GlueContext
+from boto3 import session
+from pyspark.context import SparkContext
+from pyspark.sql.functions import col, lit, regexp_replace
+from pyspark.sql.types import StructType
+
+
+S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
+S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
+PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
+SUFFIX="info.json.gz"
+IGNORE_SUFFIX=[
+    "suite.info.json.gz",
+    "setup.info.json.gz",
+    "teardown.info.json.gz",
+    "suite.output.info.json.gz",
+    "setup.output.info.json.gz",
+    "teardown.output.info.json.gz"
+]
+LAST_MODIFIED_END=utc.localize(
+    datetime.strptime(
+        f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
+        "%Y-%m-%d"
+    )
+)
+LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
+
+
+def flatten_frame(nested_sdf):
+    """Unnest Spark DataFrame in case there nested structered columns.
+
+    :param nested_sdf: Spark DataFrame.
+    :type nested_sdf: DataFrame
+    :returns: Unnest DataFrame.
+    :rtype: DataFrame
+    """
+    stack = [((), nested_sdf)]
+    columns = []
+    while len(stack) > 0:
+        parents, sdf = stack.pop()
+        for column_name, column_type in sdf.dtypes:
+            if column_type[:6] == "struct":
+                projected_sdf = sdf.select(column_name + ".*")
+                stack.append((parents + (column_name,), projected_sdf))
+            else:
+                columns.append(
+                    col(".".join(parents + (column_name,))) \
+                        .alias("_".join(parents + (column_name,)))
+                )
+    return nested_sdf.select(columns)
+
+
+def process_json_to_dataframe(schema_name, paths):
+    """Processes JSON to Spark DataFrame.
+
+    :param schema_name: Schema name.
+    :type schema_name: string
+    :param paths: S3 paths to process.
+    :type paths: list
+    :returns: Spark DataFrame.
+    :rtype: DataFrame
+    """
+    drop_subset = [
+        "dut_type", "dut_version",
+        "passed",
+        "test_name_long", "test_name_short",
+        "test_type",
+        "version"
+    ]
+
+    # load schemas
+    with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
+        schema = StructType.fromJson(load(f_schema))
+
+    # create empty DF out of schemas
+    sdf = spark.createDataFrame([], schema)
+
+    # filter list
+    filtered = [path for path in paths if schema_name in path]
+
+    # select
+    for path in filtered:
+        print(path)
+
+        sdf_loaded = spark \
+            .read \
+            .option("multiline", "true") \
+            .schema(schema) \
+            .json(path) \
+            .withColumn("job", lit(path.split("/")[4])) \
+            .withColumn("build", lit(path.split("/")[5]))
+        sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
+
+    # drop rows with all nulls and drop rows with null in critical frames
+    sdf = sdf.na.drop(how="all")
+    sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
+
+    # flatten frame
+    sdf = flatten_frame(sdf)
+
+    return sdf
+
+
+# create SparkContext and GlueContext
+spark_context = SparkContext.getOrCreate()
+spark_context.setLogLevel("WARN")
+glue_context = GlueContext(spark_context)
+spark = glue_context.spark_session
+
+# files of interest
+paths = wr.s3.list_objects(
+    path=PATH,
+    suffix=SUFFIX,
+    last_modified_begin=LAST_MODIFIED_BEGIN,
+    last_modified_end=LAST_MODIFIED_END,
+    ignore_suffix=IGNORE_SUFFIX,
+    ignore_empty=True
+)
+
+filtered_paths = [path for path in paths if "report-iterative-2210" in path]
+
+out_sdf = process_json_to_dataframe("ndrpdr", filtered_paths)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+    .withColumn("year", lit(datetime.now().year)) \
+    .withColumn("month", lit(datetime.now().month)) \
+    .withColumn("day", lit(datetime.now().day)) \
+    .repartition(1)
+
+try:
+    wr.s3.to_parquet(
+        df=out_sdf.toPandas(),
+        path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2210",
+        dataset=True,
+        partition_cols=["test_type", "year", "month", "day"],
+        compression="snappy",
+        use_threads=True,
+        mode="overwrite_partitions",
+        boto3_session=session.Session(
+            aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+            aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+            region_name=environ["OUT_AWS_DEFAULT_REGION"]
+        )
+    )
+except EmptyDataFrame:
+    pass
diff --git a/csit.infra.etl/iterative_soak_rls2210.py b/csit.infra.etl/iterative_soak_rls2210.py
new file mode 100644 (file)
index 0000000..b74d7b4
--- /dev/null
@@ -0,0 +1,170 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2022 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""ETL script running on top of the s3://"""
+
+from datetime import datetime, timedelta
+from json import load
+from os import environ
+from pytz import utc
+
+import awswrangler as wr
+from awswrangler.exceptions import EmptyDataFrame
+from awsglue.context import GlueContext
+from boto3 import session
+from pyspark.context import SparkContext
+from pyspark.sql.functions import col, lit, regexp_replace
+from pyspark.sql.types import StructType
+
+
+S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
+S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
+PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
+SUFFIX="info.json.gz"
+IGNORE_SUFFIX=[
+    "suite.info.json.gz",
+    "setup.info.json.gz",
+    "teardown.info.json.gz",
+    "suite.output.info.json.gz",
+    "setup.output.info.json.gz",
+    "teardown.output.info.json.gz"
+]
+LAST_MODIFIED_END=utc.localize(
+    datetime.strptime(
+        f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
+        "%Y-%m-%d"
+    )
+)
+LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
+
+
+def flatten_frame(nested_sdf):
+    """Unnest Spark DataFrame in case there nested structered columns.
+
+    :param nested_sdf: Spark DataFrame.
+    :type nested_sdf: DataFrame
+    :returns: Unnest DataFrame.
+    :rtype: DataFrame
+    """
+    stack = [((), nested_sdf)]
+    columns = []
+    while len(stack) > 0:
+        parents, sdf = stack.pop()
+        for column_name, column_type in sdf.dtypes:
+            if column_type[:6] == "struct":
+                projected_sdf = sdf.select(column_name + ".*")
+                stack.append((parents + (column_name,), projected_sdf))
+            else:
+                columns.append(
+                    col(".".join(parents + (column_name,))) \
+                        .alias("_".join(parents + (column_name,)))
+                )
+    return nested_sdf.select(columns)
+
+
+def process_json_to_dataframe(schema_name, paths):
+    """Processes JSON to Spark DataFrame.
+
+    :param schema_name: Schema name.
+    :type schema_name: string
+    :param paths: S3 paths to process.
+    :type paths: list
+    :returns: Spark DataFrame.
+    :rtype: DataFrame
+    """
+    drop_subset = [
+        "dut_type", "dut_version",
+        "passed",
+        "test_name_long", "test_name_short",
+        "test_type",
+        "version"
+    ]
+
+    # load schemas
+    with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
+        schema = StructType.fromJson(load(f_schema))
+
+    # create empty DF out of schemas
+    sdf = spark.createDataFrame([], schema)
+
+    # filter list
+    filtered = [path for path in paths if schema_name in path]
+
+    # select
+    for path in filtered:
+        print(path)
+
+        sdf_loaded = spark \
+            .read \
+            .option("multiline", "true") \
+            .schema(schema) \
+            .json(path) \
+            .withColumn("job", lit(path.split("/")[4])) \
+            .withColumn("build", lit(path.split("/")[5]))
+        sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
+
+    # drop rows with all nulls and drop rows with null in critical frames
+    sdf = sdf.na.drop(how="all")
+    sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
+
+    # flatten frame
+    sdf = flatten_frame(sdf)
+
+    return sdf
+
+
+# create SparkContext and GlueContext
+spark_context = SparkContext.getOrCreate()
+spark_context.setLogLevel("WARN")
+glue_context = GlueContext(spark_context)
+spark = glue_context.spark_session
+
+# files of interest
+paths = wr.s3.list_objects(
+    path=PATH,
+    suffix=SUFFIX,
+    last_modified_begin=LAST_MODIFIED_BEGIN,
+    last_modified_end=LAST_MODIFIED_END,
+    ignore_suffix=IGNORE_SUFFIX,
+    ignore_empty=True
+)
+
+filtered_paths = [path for path in paths if "report-iterative-2210" in path]
+
+out_sdf = process_json_to_dataframe("soak", filtered_paths)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+    .withColumn("year", lit(datetime.now().year)) \
+    .withColumn("month", lit(datetime.now().month)) \
+    .withColumn("day", lit(datetime.now().day)) \
+    .repartition(1)
+
+try:
+    wr.s3.to_parquet(
+        df=out_sdf.toPandas(),
+        path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2210",
+        dataset=True,
+        partition_cols=["test_type", "year", "month", "day"],
+        compression="snappy",
+        use_threads=True,
+        mode="overwrite_partitions",
+        boto3_session=session.Session(
+            aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+            aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+            region_name=environ["OUT_AWS_DEFAULT_REGION"]
+        )
+    )
+except EmptyDataFrame:
+    pass
similarity index 81%
rename from csit.infra.etl/trending.py
rename to csit.infra.etl/trending_mrr.py
index bc27aaa..1ba8c69 100644 (file)
@@ -143,30 +143,29 @@ paths = wr.s3.list_objects(
 
 filtered_paths = [path for path in paths if "daily" in path or "weekly" in path]
 
 
 filtered_paths = [path for path in paths if "daily" in path or "weekly" in path]
 
-for schema_name in ["mrr", "ndrpdr", "soak"]:
-    out_sdf = process_json_to_dataframe(schema_name, filtered_paths)
-    out_sdf.show(truncate=False)
-    out_sdf.printSchema()
-    out_sdf = out_sdf \
-        .withColumn("year", lit(datetime.now().year)) \
-        .withColumn("month", lit(datetime.now().month)) \
-        .withColumn("day", lit(datetime.now().day)) \
-        .repartition(1)
-
-    try:
-        wr.s3.to_parquet(
-            df=out_sdf.toPandas(),
-            path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending",
-            dataset=True,
-            partition_cols=["test_type", "year", "month", "day"],
-            compression="snappy",
-            use_threads=True,
-            mode="overwrite_partitions",
-            boto3_session=session.Session(
-                aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
-                aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
-                region_name=environ["OUT_AWS_DEFAULT_REGION"]
-            )
+out_sdf = process_json_to_dataframe("mrr", filtered_paths)
+out_sdf.show(truncate=False)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+    .withColumn("year", lit(datetime.now().year)) \
+    .withColumn("month", lit(datetime.now().month)) \
+    .withColumn("day", lit(datetime.now().day)) \
+    .repartition(1)
+
+try:
+    wr.s3.to_parquet(
+        df=out_sdf.toPandas(),
+        path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending",
+        dataset=True,
+        partition_cols=["test_type", "year", "month", "day"],
+        compression="snappy",
+        use_threads=True,
+        mode="overwrite_partitions",
+        boto3_session=session.Session(
+            aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+            aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+            region_name=environ["OUT_AWS_DEFAULT_REGION"]
         )
         )
-    except EmptyDataFrame:
-        pass
+    )
+except EmptyDataFrame:
+    pass
diff --git a/csit.infra.etl/trending_ndrpdr.py b/csit.infra.etl/trending_ndrpdr.py
new file mode 100644 (file)
index 0000000..d3c51ba
--- /dev/null
@@ -0,0 +1,171 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2022 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""ETL script running on top of the s3://"""
+
+from datetime import datetime, timedelta
+from json import load
+from os import environ
+from pytz import utc
+
+import awswrangler as wr
+from awswrangler.exceptions import EmptyDataFrame
+from awsglue.context import GlueContext
+from boto3 import session
+from pyspark.context import SparkContext
+from pyspark.sql.functions import col, lit, regexp_replace
+from pyspark.sql.types import StructType
+
+
+S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
+S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
+PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
+SUFFIX="info.json.gz"
+IGNORE_SUFFIX=[
+    "suite.info.json.gz",
+    "setup.info.json.gz",
+    "teardown.info.json.gz",
+    "suite.output.info.json.gz",
+    "setup.output.info.json.gz",
+    "teardown.output.info.json.gz"
+]
+LAST_MODIFIED_END=utc.localize(
+    datetime.strptime(
+        f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
+        "%Y-%m-%d"
+    )
+)
+LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
+
+
+def flatten_frame(nested_sdf):
+    """Unnest Spark DataFrame in case there nested structered columns.
+
+    :param nested_sdf: Spark DataFrame.
+    :type nested_sdf: DataFrame
+    :returns: Unnest DataFrame.
+    :rtype: DataFrame
+    """
+    stack = [((), nested_sdf)]
+    columns = []
+    while len(stack) > 0:
+        parents, sdf = stack.pop()
+        for column_name, column_type in sdf.dtypes:
+            if column_type[:6] == "struct":
+                projected_sdf = sdf.select(column_name + ".*")
+                stack.append((parents + (column_name,), projected_sdf))
+            else:
+                columns.append(
+                    col(".".join(parents + (column_name,))) \
+                        .alias("_".join(parents + (column_name,)))
+                )
+    return nested_sdf.select(columns)
+
+
+def process_json_to_dataframe(schema_name, paths):
+    """Processes JSON to Spark DataFrame.
+
+    :param schema_name: Schema name.
+    :type schema_name: string
+    :param paths: S3 paths to process.
+    :type paths: list
+    :returns: Spark DataFrame.
+    :rtype: DataFrame
+    """
+    drop_subset = [
+        "dut_type", "dut_version",
+        "passed",
+        "test_name_long", "test_name_short",
+        "test_type",
+        "version"
+    ]
+
+    # load schemas
+    with open(f"trending_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
+        schema = StructType.fromJson(load(f_schema))
+
+    # create empty DF out of schemas
+    sdf = spark.createDataFrame([], schema)
+
+    # filter list
+    filtered = [path for path in paths if schema_name in path]
+
+    # select
+    for path in filtered:
+        print(path)
+
+        sdf_loaded = spark \
+            .read \
+            .option("multiline", "true") \
+            .schema(schema) \
+            .json(path) \
+            .withColumn("job", lit(path.split("/")[4])) \
+            .withColumn("build", lit(path.split("/")[5]))
+        sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
+
+    # drop rows with all nulls and drop rows with null in critical frames
+    sdf = sdf.na.drop(how="all")
+    sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
+
+    # flatten frame
+    sdf = flatten_frame(sdf)
+
+    return sdf
+
+
+# create SparkContext and GlueContext
+spark_context = SparkContext.getOrCreate()
+spark_context.setLogLevel("WARN")
+glue_context = GlueContext(spark_context)
+spark = glue_context.spark_session
+
+# files of interest
+paths = wr.s3.list_objects(
+    path=PATH,
+    suffix=SUFFIX,
+    last_modified_begin=LAST_MODIFIED_BEGIN,
+    last_modified_end=LAST_MODIFIED_END,
+    ignore_suffix=IGNORE_SUFFIX,
+    ignore_empty=True
+)
+
+filtered_paths = [path for path in paths if "daily" in path or "weekly" in path]
+
+out_sdf = process_json_to_dataframe("ndrpdr", filtered_paths)
+out_sdf.show(truncate=False)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+    .withColumn("year", lit(datetime.now().year)) \
+    .withColumn("month", lit(datetime.now().month)) \
+    .withColumn("day", lit(datetime.now().day)) \
+    .repartition(1)
+
+try:
+    wr.s3.to_parquet(
+        df=out_sdf.toPandas(),
+        path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending",
+        dataset=True,
+        partition_cols=["test_type", "year", "month", "day"],
+        compression="snappy",
+        use_threads=True,
+        mode="overwrite_partitions",
+        boto3_session=session.Session(
+            aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+            aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+            region_name=environ["OUT_AWS_DEFAULT_REGION"]
+        )
+    )
+except EmptyDataFrame:
+    pass
diff --git a/csit.infra.etl/trending_soak.py b/csit.infra.etl/trending_soak.py
new file mode 100644 (file)
index 0000000..e54cf9f
--- /dev/null
@@ -0,0 +1,171 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2022 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""ETL script running on top of the s3://"""
+
+from datetime import datetime, timedelta
+from json import load
+from os import environ
+from pytz import utc
+
+import awswrangler as wr
+from awswrangler.exceptions import EmptyDataFrame
+from awsglue.context import GlueContext
+from boto3 import session
+from pyspark.context import SparkContext
+from pyspark.sql.functions import col, lit, regexp_replace
+from pyspark.sql.types import StructType
+
+
+S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
+S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
+PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
+SUFFIX="info.json.gz"
+IGNORE_SUFFIX=[
+    "suite.info.json.gz",
+    "setup.info.json.gz",
+    "teardown.info.json.gz",
+    "suite.output.info.json.gz",
+    "setup.output.info.json.gz",
+    "teardown.output.info.json.gz"
+]
+LAST_MODIFIED_END=utc.localize(
+    datetime.strptime(
+        f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
+        "%Y-%m-%d"
+    )
+)
+LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
+
+
+def flatten_frame(nested_sdf):
+    """Unnest Spark DataFrame in case there nested structered columns.
+
+    :param nested_sdf: Spark DataFrame.
+    :type nested_sdf: DataFrame
+    :returns: Unnest DataFrame.
+    :rtype: DataFrame
+    """
+    stack = [((), nested_sdf)]
+    columns = []
+    while len(stack) > 0:
+        parents, sdf = stack.pop()
+        for column_name, column_type in sdf.dtypes:
+            if column_type[:6] == "struct":
+                projected_sdf = sdf.select(column_name + ".*")
+                stack.append((parents + (column_name,), projected_sdf))
+            else:
+                columns.append(
+                    col(".".join(parents + (column_name,))) \
+                        .alias("_".join(parents + (column_name,)))
+                )
+    return nested_sdf.select(columns)
+
+
+def process_json_to_dataframe(schema_name, paths):
+    """Processes JSON to Spark DataFrame.
+
+    :param schema_name: Schema name.
+    :type schema_name: string
+    :param paths: S3 paths to process.
+    :type paths: list
+    :returns: Spark DataFrame.
+    :rtype: DataFrame
+    """
+    drop_subset = [
+        "dut_type", "dut_version",
+        "passed",
+        "test_name_long", "test_name_short",
+        "test_type",
+        "version"
+    ]
+
+    # load schemas
+    with open(f"trending_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
+        schema = StructType.fromJson(load(f_schema))
+
+    # create empty DF out of schemas
+    sdf = spark.createDataFrame([], schema)
+
+    # filter list
+    filtered = [path for path in paths if schema_name in path]
+
+    # select
+    for path in filtered:
+        print(path)
+
+        sdf_loaded = spark \
+            .read \
+            .option("multiline", "true") \
+            .schema(schema) \
+            .json(path) \
+            .withColumn("job", lit(path.split("/")[4])) \
+            .withColumn("build", lit(path.split("/")[5]))
+        sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
+
+    # drop rows with all nulls and drop rows with null in critical frames
+    sdf = sdf.na.drop(how="all")
+    sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
+
+    # flatten frame
+    sdf = flatten_frame(sdf)
+
+    return sdf
+
+
+# create SparkContext and GlueContext
+spark_context = SparkContext.getOrCreate()
+spark_context.setLogLevel("WARN")
+glue_context = GlueContext(spark_context)
+spark = glue_context.spark_session
+
+# files of interest
+paths = wr.s3.list_objects(
+    path=PATH,
+    suffix=SUFFIX,
+    last_modified_begin=LAST_MODIFIED_BEGIN,
+    last_modified_end=LAST_MODIFIED_END,
+    ignore_suffix=IGNORE_SUFFIX,
+    ignore_empty=True
+)
+
+filtered_paths = [path for path in paths if "daily" in path or "weekly" in path]
+
+out_sdf = process_json_to_dataframe("soak", filtered_paths)
+out_sdf.show(truncate=False)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+    .withColumn("year", lit(datetime.now().year)) \
+    .withColumn("month", lit(datetime.now().month)) \
+    .withColumn("day", lit(datetime.now().day)) \
+    .repartition(1)
+
+try:
+    wr.s3.to_parquet(
+        df=out_sdf.toPandas(),
+        path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending",
+        dataset=True,
+        partition_cols=["test_type", "year", "month", "day"],
+        compression="snappy",
+        use_threads=True,
+        mode="overwrite_partitions",
+        boto3_session=session.Session(
+            aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+            aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+            region_name=environ["OUT_AWS_DEFAULT_REGION"]
+        )
+    )
+except EmptyDataFrame:
+    pass
index 0d0ecfa..0abb0e5 100644 (file)
 job "${job_name}" {
 job "${job_name}" {
-  # The "datacenters" parameter specifies the list of datacenters which should
-  # be considered when placing this task. This must be provided.
   datacenters = "${datacenters}"
   datacenters = "${datacenters}"
-
-  # The "type" parameter controls the type of job, which impacts the scheduler's
-  # decision on placement. For a full list of job types and their differences,
-  # please see the online documentation.
-  #
-  #     https://www.nomadproject.io/docs/jobspec/schedulers
-  #
   type        = "${type}"
   type        = "${type}"
-
-  # The periodic stanza allows a job to run at fixed times, dates, or intervals.
-  # The easiest way to think about the periodic scheduler is "Nomad cron" or
-  # "distributed cron".
-  #
-  #     https://www.nomadproject.io/docs/job-specification/periodic
-  #
   periodic {
     cron             = "${cron}"
     prohibit_overlap = "${prohibit_overlap}"
     time_zone        = "${time_zone}"
   }
   periodic {
     cron             = "${cron}"
     prohibit_overlap = "${prohibit_overlap}"
     time_zone        = "${time_zone}"
   }
-
-  # The "group" stanza defines a series of tasks that should be co-located on
-  # the same Nomad client. Any task within a group will be placed on the same
-  # client.
-  #
-  #     https://www.nomadproject.io/docs/job-specification/group
-  #
   group "${job_name}-master" {
   group "${job_name}-master" {
-    # The restart stanza configures a tasks's behavior on task failure. Restarts
-    # happen on the client that is running the task.
-    #
-    # https://www.nomadproject.io/docs/job-specification/restart
-    #
     restart {
       mode = "fail"
     }
     restart {
       mode = "fail"
     }
-
-    # The constraint allows restricting the set of eligible nodes. Constraints
-    # may filter on attributes or client metadata.
-    #
-    # For more information and examples on the "volume" stanza, please see
-    # the online documentation at:
-    #
-    #     https://www.nomadproject.io/docs/job-specification/constraint
-    #
     constraint {
       attribute       = "$${attr.cpu.arch}"
       operator        = "!="
       value           = "arm64"
     }
     constraint {
       attribute       = "$${attr.cpu.arch}"
       operator        = "!="
       value           = "arm64"
     }
-
     constraint {
       attribute      = "$${node.class}"
       value          = "builder"
     }
     constraint {
       attribute      = "$${node.class}"
       value          = "builder"
     }
-
-    # The "task" stanza creates an individual unit of work, such as a Docker
-    # container, web application, or batch processing.
-    #
-    #     https://www.nomadproject.io/docs/job-specification/task.html
-    #
-    task "${job_name}-trending" {
-      # The artifact stanza instructs Nomad to fetch and unpack a remote
-      # resource, such as a file, tarball, or binary.
-      #
-      #     https://www.nomadproject.io/docs/job-specification/artifact
-      #
+    task "${job_name}-trending-mrr" {
       artifact {
         source      = "git::https://github.com/FDio/csit"
         destination = "local/csit"
       }
       artifact {
         source      = "git::https://github.com/FDio/csit"
         destination = "local/csit"
       }
-
-      # The "driver" parameter specifies the task driver that should be used to
-      # run the task.
       driver = "docker"
       driver = "docker"
-
-      # The "config" stanza specifies the driver configuration, which is passed
-      # directly to the driver to start the task. The details of configurations
-      # are specific to each driver, so please see specific driver
-      # documentation for more information.
       config {
         image   = "${image}"
         command = "gluesparksubmit"
         args = [
           "--driver-memory", "30g",
           "--executor-memory", "30g",
       config {
         image   = "${image}"
         command = "gluesparksubmit"
         args = [
           "--driver-memory", "30g",
           "--executor-memory", "30g",
-          "trending.py"
+          "trending_mrr.py"
+        ]
+        work_dir = "/local/csit/csit.infra.etl"
+      }
+      env {
+        AWS_ACCESS_KEY_ID         = "${aws_access_key_id}"
+        AWS_SECRET_ACCESS_KEY     = "${aws_secret_access_key}"
+        AWS_DEFAULT_REGION        = "${aws_default_region}"
+        OUT_AWS_ACCESS_KEY_ID     = "${out_aws_access_key_id}"
+        OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+        OUT_AWS_DEFAULT_REGION    = "${out_aws_default_region}"
+        ${ envs }
+      }
+      resources {
+        cpu    = ${cpu}
+        memory = ${memory}
+      }
+    }
+    task "${job_name}-trending-ndrpdr" {
+      artifact {
+        source      = "git::https://github.com/FDio/csit"
+        destination = "local/csit"
+      }
+      driver = "docker"
+      config {
+        image   = "${image}"
+        command = "gluesparksubmit"
+        args = [
+          "--driver-memory", "30g",
+          "--executor-memory", "30g",
+          "trending_ndrpdr.py"
+        ]
+        work_dir = "/local/csit/csit.infra.etl"
+      }
+      env {
+        AWS_ACCESS_KEY_ID         = "${aws_access_key_id}"
+        AWS_SECRET_ACCESS_KEY     = "${aws_secret_access_key}"
+        AWS_DEFAULT_REGION        = "${aws_default_region}"
+        OUT_AWS_ACCESS_KEY_ID     = "${out_aws_access_key_id}"
+        OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+        OUT_AWS_DEFAULT_REGION    = "${out_aws_default_region}"
+        ${ envs }
+      }
+      resources {
+        cpu    = ${cpu}
+        memory = ${memory}
+      }
+    }
+    task "${job_name}-trending-soak" {
+      artifact {
+        source      = "git::https://github.com/FDio/csit"
+        destination = "local/csit"
+      }
+      driver = "docker"
+      config {
+        image   = "${image}"
+        command = "gluesparksubmit"
+        args = [
+          "--driver-memory", "30g",
+          "--executor-memory", "30g",
+          "trending_soak.py"
         ]
         work_dir = "/local/csit/csit.infra.etl"
       }
         ]
         work_dir = "/local/csit/csit.infra.etl"
       }
-
-      # The env stanza configures a list of environment variables to populate
-      # the task's environment before starting.
       env {
         AWS_ACCESS_KEY_ID         = "${aws_access_key_id}"
         AWS_SECRET_ACCESS_KEY     = "${aws_secret_access_key}"
       env {
         AWS_ACCESS_KEY_ID         = "${aws_access_key_id}"
         AWS_SECRET_ACCESS_KEY     = "${aws_secret_access_key}"
@@ -104,38 +104,17 @@ job "${job_name}" {
         OUT_AWS_DEFAULT_REGION    = "${out_aws_default_region}"
         ${ envs }
       }
         OUT_AWS_DEFAULT_REGION    = "${out_aws_default_region}"
         ${ envs }
       }
-
-      # The "resources" stanza describes the requirements a task needs to
-      # execute. Resource requirements include memory, network, cpu, and more.
-      # This ensures the task will execute on a machine that contains enough
-      # resource capacity.
-      #
-      #     https://www.nomadproject.io/docs/job-specification/resources
-      #
       resources {
         cpu    = ${cpu}
         memory = ${memory}
       }
     }
     task "${job_name}-stats" {
       resources {
         cpu    = ${cpu}
         memory = ${memory}
       }
     }
     task "${job_name}-stats" {
-      # The artifact stanza instructs Nomad to fetch and unpack a remote
-      # resource, such as a file, tarball, or binary.
-      #
-      #     https://www.nomadproject.io/docs/job-specification/artifact
-      #
       artifact {
         source      = "git::https://github.com/FDio/csit"
         destination = "local/csit"
       }
       artifact {
         source      = "git::https://github.com/FDio/csit"
         destination = "local/csit"
       }
-
-      # The "driver" parameter specifies the task driver that should be used to
-      # run the task.
       driver = "docker"
       driver = "docker"
-
-      # The "config" stanza specifies the driver configuration, which is passed
-      # directly to the driver to start the task. The details of configurations
-      # are specific to each driver, so please see specific driver
-      # documentation for more information.
       config {
         image   = "${image}"
         command = "gluesparksubmit"
       config {
         image   = "${image}"
         command = "gluesparksubmit"
@@ -146,9 +125,6 @@ job "${job_name}" {
         ]
         work_dir = "/local/csit/csit.infra.etl"
       }
         ]
         work_dir = "/local/csit/csit.infra.etl"
       }
-
-      # The env stanza configures a list of environment variables to populate
-      # the task's environment before starting.
       env {
         AWS_ACCESS_KEY_ID         = "${aws_access_key_id}"
         AWS_SECRET_ACCESS_KEY     = "${aws_secret_access_key}"
       env {
         AWS_ACCESS_KEY_ID         = "${aws_access_key_id}"
         AWS_SECRET_ACCESS_KEY     = "${aws_secret_access_key}"
@@ -158,165 +134,248 @@ job "${job_name}" {
         OUT_AWS_DEFAULT_REGION    = "${out_aws_default_region}"
         ${ envs }
       }
         OUT_AWS_DEFAULT_REGION    = "${out_aws_default_region}"
         ${ envs }
       }
-
-      # The "resources" stanza describes the requirements a task needs to
-      # execute. Resource requirements include memory, network, cpu, and more.
-      # This ensures the task will execute on a machine that contains enough
-      # resource capacity.
-      #
-      #     https://www.nomadproject.io/docs/job-specification/resources
-      #
       resources {
         cpu    = ${cpu}
         memory = ${memory}
       }
     }
   }
       resources {
         cpu    = ${cpu}
         memory = ${memory}
       }
     }
   }
-#  group "${job_name}-rls2206" {
-#    # The restart stanza configures a tasks's behavior on task failure. Restarts
-#    # happen on the client that is running the task.
-#    #
-#    # https://www.nomadproject.io/docs/job-specification/restart
-#    #
-#    restart {
-#      mode = "fail"
-#    }
-#
-#    # The constraint allows restricting the set of eligible nodes. Constraints
-#    # may filter on attributes or client metadata.
-#    #
-#    # For more information and examples on the "volume" stanza, please see
-#    # the online documentation at:
-#    #
-#    #     https://www.nomadproject.io/docs/job-specification/constraint
-#    #
-#    constraint {
-#      attribute       = "$${attr.cpu.arch}"
-#      operator        = "!="
-#      value           = "arm64"
-#    }
-#
-#    constraint {
-#      attribute      = "$${node.class}"
-#      value          = "builder"
-#    }
-#
-#    # The "task" stanza creates an individual unit of work, such as a Docker
-#    # container, web application, or batch processing.
-#    #
-#    #     https://www.nomadproject.io/docs/job-specification/task.html
-#    #
-#    task "${job_name}-coverage" {
-#      # The artifact stanza instructs Nomad to fetch and unpack a remote
-#      # resource, such as a file, tarball, or binary.
-#      #
-#      #     https://www.nomadproject.io/docs/job-specification/artifact
-#      #
-#      artifact {
-#        source      = "git::https://github.com/FDio/csit"
-#        destination = "local/csit"
-#      }
-#
-#      # The "driver" parameter specifies the task driver that should be used to
-#      # run the task.
-#      driver = "docker"
-#
-#      # The "config" stanza specifies the driver configuration, which is passed
-#      # directly to the driver to start the task. The details of configurations
-#      # are specific to each driver, so please see specific driver
-#      # documentation for more information.
-#      config {
-#        image   = "${image}"
-#        command = "gluesparksubmit"
-#        args = [
-#          "--driver-memory", "20g",
-#          "--executor-memory", "20g",
-#          "--executor-cores", "2",
-#          "--master", "local[2]",
-#          "coverage_rls2206.py"
-#        ]
-#        work_dir = "/local/csit/csit.infra.etl"
-#      }
-#
-#      # The env stanza configures a list of environment variables to populate
-#      # the task's environment before starting.
-#      env {
-#        AWS_ACCESS_KEY_ID         = "${aws_access_key_id}"
-#        AWS_SECRET_ACCESS_KEY     = "${aws_secret_access_key}"
-#        AWS_DEFAULT_REGION        = "${aws_default_region}"
-#        OUT_AWS_ACCESS_KEY_ID     = "${out_aws_access_key_id}"
-#        OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
-#        OUT_AWS_DEFAULT_REGION    = "${out_aws_default_region}"
-#        ${ envs }
-#      }
-#
-#      # The "resources" stanza describes the requirements a task needs to
-#      # execute. Resource requirements include memory, network, cpu, and more.
-#      # This ensures the task will execute on a machine that contains enough
-#      # resource capacity.
-#      #
-#      #     https://www.nomadproject.io/docs/job-specification/resources
-#      #
-#      resources {
-#        cpu    = ${cpu}
-#        memory = ${memory}
-#      }
-#    }
-#    task "${job_name}-iterative" {
-#      # The artifact stanza instructs Nomad to fetch and unpack a remote
-#      # resource, such as a file, tarball, or binary.
-#      #
-#      #     https://www.nomadproject.io/docs/job-specification/artifact
-#      #
-#      artifact {
-#        source      = "git::https://github.com/FDio/csit"
-#        destination = "local/csit"
-#      }
-#
-#      # The "driver" parameter specifies the task driver that should be used to
-#      # run the task.
-#      driver = "docker"
-#
-#      # The "config" stanza specifies the driver configuration, which is passed
-#      # directly to the driver to start the task. The details of configurations
-#      # are specific to each driver, so please see specific driver
-#      # documentation for more information.
-#      config {
-#        image   = "${image}"
-#        command = "gluesparksubmit"
-#        args = [
-#          "--driver-memory", "20g",
-#          "--executor-memory", "20g",
-#          "--executor-cores", "2",
-#          "--master", "local[2]",
-#          "iterative_rls2206.py"
-#        ]
-#        work_dir = "/local/csit/csit.infra.etl"
-#      }
-#
-#      # The env stanza configures a list of environment variables to populate
-#      # the task's environment before starting.
-#      env {
-#        AWS_ACCESS_KEY_ID         = "${aws_access_key_id}"
-#        AWS_SECRET_ACCESS_KEY     = "${aws_secret_access_key}"
-#        AWS_DEFAULT_REGION        = "${aws_default_region}"
-#        OUT_AWS_ACCESS_KEY_ID     = "${out_aws_access_key_id}"
-#        OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
-#        OUT_AWS_DEFAULT_REGION    = "${out_aws_default_region}"
-#        ${ envs }
-#      }
-#
-#      # The "resources" stanza describes the requirements a task needs to
-#      # execute. Resource requirements include memory, network, cpu, and more.
-#      # This ensures the task will execute on a machine that contains enough
-#      # resource capacity.
-#      #
-#      #     https://www.nomadproject.io/docs/job-specification/resources
-#      #
-#      resources {
-#        cpu    = ${cpu}
-#        memory = ${memory}
-#      }
-#    }
-#  }
+  group "${job_name}-rls2210" {
+    restart {
+      mode = "fail"
+    }
+    constraint {
+      attribute = "$${attr.cpu.arch}"
+      operator  = "!="
+      value     = "arm64"
+    }
+    constraint {
+      attribute = "$${node.class}"
+      value     = "builder"
+    }
+    task "${job_name}-coverage-device" {
+      artifact {
+        source      = "git::https://github.com/FDio/csit"
+        destination = "local/csit"
+      }
+      driver = "docker"
+      config {
+        image   = "${image}"
+        command = "gluesparksubmit"
+        args = [
+          "--driver-memory", "20g",
+          "--executor-memory", "20g",
+          "--executor-cores", "2",
+          "--master", "local[2]",
+          "coverage_device_rls2210.py"
+        ]
+        work_dir = "/local/csit/csit.infra.etl"
+      }
+      env {
+        AWS_ACCESS_KEY_ID         = "${aws_access_key_id}"
+        AWS_SECRET_ACCESS_KEY     = "${aws_secret_access_key}"
+        AWS_DEFAULT_REGION        = "${aws_default_region}"
+        OUT_AWS_ACCESS_KEY_ID     = "${out_aws_access_key_id}"
+        OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+        OUT_AWS_DEFAULT_REGION    = "${out_aws_default_region}"
+        ${ envs }
+      }
+      resources {
+        cpu    = ${cpu}
+        memory = ${memory}
+      }
+    }
+    task "${job_name}-coverage-mrr" {
+      artifact {
+        source      = "git::https://github.com/FDio/csit"
+        destination = "local/csit"
+      }
+      driver = "docker"
+      config {
+        image   = "${image}"
+        command = "gluesparksubmit"
+        args = [
+          "--driver-memory", "20g",
+          "--executor-memory", "20g",
+          "--executor-cores", "2",
+          "--master", "local[2]",
+          "coverage_mrr_rls2210.py"
+        ]
+        work_dir = "/local/csit/csit.infra.etl"
+      }
+      env {
+        AWS_ACCESS_KEY_ID         = "${aws_access_key_id}"
+        AWS_SECRET_ACCESS_KEY     = "${aws_secret_access_key}"
+        AWS_DEFAULT_REGION        = "${aws_default_region}"
+        OUT_AWS_ACCESS_KEY_ID     = "${out_aws_access_key_id}"
+        OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+        OUT_AWS_DEFAULT_REGION    = "${out_aws_default_region}"
+        ${ envs }
+      }
+      resources {
+        cpu    = ${cpu}
+        memory = ${memory}
+      }
+    }
+    task "${job_name}-coverage-ndrpdr" {
+      artifact {
+        source      = "git::https://github.com/FDio/csit"
+        destination = "local/csit"
+      }
+      driver = "docker"
+      config {
+        image   = "${image}"
+        command = "gluesparksubmit"
+        args = [
+          "--driver-memory", "20g",
+          "--executor-memory", "20g",
+          "--executor-cores", "2",
+          "--master", "local[2]",
+          "coverage_ndrpdr_rls2210.py"
+        ]
+        work_dir = "/local/csit/csit.infra.etl"
+      }
+      env {
+        AWS_ACCESS_KEY_ID         = "${aws_access_key_id}"
+        AWS_SECRET_ACCESS_KEY     = "${aws_secret_access_key}"
+        AWS_DEFAULT_REGION        = "${aws_default_region}"
+        OUT_AWS_ACCESS_KEY_ID     = "${out_aws_access_key_id}"
+        OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+        OUT_AWS_DEFAULT_REGION    = "${out_aws_default_region}"
+        ${ envs }
+      }
+      resources {
+        cpu    = ${cpu}
+        memory = ${memory}
+      }
+    }
+    task "${job_name}-coverage-soak" {
+      artifact {
+        source      = "git::https://github.com/FDio/csit"
+        destination = "local/csit"
+      }
+      driver = "docker"
+      config {
+        image   = "${image}"
+        command = "gluesparksubmit"
+        args = [
+          "--driver-memory", "20g",
+          "--executor-memory", "20g",
+          "--executor-cores", "2",
+          "--master", "local[2]",
+          "coverage_soak_rls2210.py"
+        ]
+        work_dir = "/local/csit/csit.infra.etl"
+      }
+      env {
+        AWS_ACCESS_KEY_ID         = "${aws_access_key_id}"
+        AWS_SECRET_ACCESS_KEY     = "${aws_secret_access_key}"
+        AWS_DEFAULT_REGION        = "${aws_default_region}"
+        OUT_AWS_ACCESS_KEY_ID     = "${out_aws_access_key_id}"
+        OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+        OUT_AWS_DEFAULT_REGION    = "${out_aws_default_region}"
+        ${ envs }
+      }
+      resources {
+        cpu    = ${cpu}
+        memory = ${memory}
+      }
+    }
+    task "${job_name}-iterative-mrr" {
+      artifact {
+        source      = "git::https://github.com/FDio/csit"
+        destination = "local/csit"
+      }
+      driver = "docker"
+      config {
+        image   = "${image}"
+        command = "gluesparksubmit"
+        args = [
+          "--driver-memory", "20g",
+          "--executor-memory", "20g",
+          "--executor-cores", "2",
+          "--master", "local[2]",
+          "iterative_mrr_rls2210.py"
+        ]
+        work_dir = "/local/csit/csit.infra.etl"
+      }
+      env {
+        AWS_ACCESS_KEY_ID         = "${aws_access_key_id}"
+        AWS_SECRET_ACCESS_KEY     = "${aws_secret_access_key}"
+        AWS_DEFAULT_REGION        = "${aws_default_region}"
+        OUT_AWS_ACCESS_KEY_ID     = "${out_aws_access_key_id}"
+        OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+        OUT_AWS_DEFAULT_REGION    = "${out_aws_default_region}"
+        ${ envs }
+      }
+      resources {
+        cpu    = ${cpu}
+        memory = ${memory}
+      }
+    }
+    task "${job_name}-iterative-ndrpdr" {
+      artifact {
+        source      = "git::https://github.com/FDio/csit"
+        destination = "local/csit"
+      }
+      driver = "docker"
+      config {
+        image   = "${image}"
+        command = "gluesparksubmit"
+        args = [
+          "--driver-memory", "20g",
+          "--executor-memory", "20g",
+          "--executor-cores", "2",
+          "--master", "local[2]",
+          "iterative_ndrpdr_rls2210.py"
+        ]
+        work_dir = "/local/csit/csit.infra.etl"
+      }
+      env {
+        AWS_ACCESS_KEY_ID         = "${aws_access_key_id}"
+        AWS_SECRET_ACCESS_KEY     = "${aws_secret_access_key}"
+        AWS_DEFAULT_REGION        = "${aws_default_region}"
+        OUT_AWS_ACCESS_KEY_ID     = "${out_aws_access_key_id}"
+        OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+        OUT_AWS_DEFAULT_REGION    = "${out_aws_default_region}"
+        ${ envs }
+      }
+      resources {
+        cpu    = ${cpu}
+        memory = ${memory}
+      }
+    }
+    task "${job_name}-iterative-soak" {
+      artifact {
+        source      = "git::https://github.com/FDio/csit"
+        destination = "local/csit"
+      }
+      driver = "docker"
+      config {
+        image   = "${image}"
+        command = "gluesparksubmit"
+        args = [
+          "--driver-memory", "20g",
+          "--executor-memory", "20g",
+          "--executor-cores", "2",
+          "--master", "local[2]",
+          "iterative_soak_rls2210.py"
+        ]
+        work_dir = "/local/csit/csit.infra.etl"
+      }
+      env {
+        AWS_ACCESS_KEY_ID         = "${aws_access_key_id}"
+        AWS_SECRET_ACCESS_KEY     = "${aws_secret_access_key}"
+        AWS_DEFAULT_REGION        = "${aws_default_region}"
+        OUT_AWS_ACCESS_KEY_ID     = "${out_aws_access_key_id}"
+        OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+        OUT_AWS_DEFAULT_REGION    = "${out_aws_default_region}"
+        ${ envs }
+      }
+      resources {
+        cpu    = ${cpu}
+        memory = ${memory}
+      }
+    }
+  }
 }
 }