From bff439b69ee71b654b1da92564ff62de7327fe71 Mon Sep 17 00:00:00 2001 From: pmikus Date: Tue, 27 Sep 2022 14:23:42 +0200 Subject: [PATCH] feat(etl): Add rls2210 Signed-off-by: pmikus Change-Id: Icda348f7381255deb27b1ada69fcb9fbd4ead600 --- ...erage_rls2206.py => coverage_device_rls2210.py} | 53 +- csit.infra.etl/coverage_mrr_rls2210.py | 170 +++++++ csit.infra.etl/coverage_ndrpdr_rls2210.py | 170 +++++++ csit.infra.etl/coverage_soak_rls2210.py | 170 +++++++ ...erative_rls2206.py => iterative_mrr_rls2210.py} | 53 +- csit.infra.etl/iterative_ndrpdr_rls2210.py | 170 +++++++ csit.infra.etl/iterative_soak_rls2210.py | 170 +++++++ csit.infra.etl/{trending.py => trending_mrr.py} | 51 +- csit.infra.etl/trending_ndrpdr.py | 171 +++++++ csit.infra.etl/trending_soak.py | 171 +++++++ .../conf/nomad/etl.hcl.tftpl | 541 ++++++++++++--------- 11 files changed, 1569 insertions(+), 321 deletions(-) rename csit.infra.etl/{coverage_rls2206.py => coverage_device_rls2210.py} (79%) create mode 100644 csit.infra.etl/coverage_mrr_rls2210.py create mode 100644 csit.infra.etl/coverage_ndrpdr_rls2210.py create mode 100644 csit.infra.etl/coverage_soak_rls2210.py rename csit.infra.etl/{iterative_rls2206.py => iterative_mrr_rls2210.py} (81%) create mode 100644 csit.infra.etl/iterative_ndrpdr_rls2210.py create mode 100644 csit.infra.etl/iterative_soak_rls2210.py rename csit.infra.etl/{trending.py => trending_mrr.py} (81%) create mode 100644 csit.infra.etl/trending_ndrpdr.py create mode 100644 csit.infra.etl/trending_soak.py diff --git a/csit.infra.etl/coverage_rls2206.py b/csit.infra.etl/coverage_device_rls2210.py similarity index 79% rename from csit.infra.etl/coverage_rls2206.py rename to csit.infra.etl/coverage_device_rls2210.py index 4e2619d924..9c9e1c9603 100644 --- a/csit.infra.etl/coverage_rls2206.py +++ b/csit.infra.etl/coverage_device_rls2210.py @@ -141,31 +141,30 @@ paths = wr.s3.list_objects( ignore_empty=True ) -filtered_paths = [path for path in paths if "report-coverage-2206" in path] - -for schema_name in ["mrr", "ndrpdr", "soak", "device"]: - out_sdf = process_json_to_dataframe(schema_name, filtered_paths) - out_sdf.printSchema() - out_sdf = out_sdf \ - .withColumn("year", lit(datetime.now().year)) \ - .withColumn("month", lit(datetime.now().month)) \ - .withColumn("day", lit(datetime.now().day)) \ - .repartition(1) - - try: - wr.s3.to_parquet( - df=out_sdf.toPandas(), - path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2206", - dataset=True, - partition_cols=["test_type", "year", "month", "day"], - compression="snappy", - use_threads=True, - mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) +filtered_paths = [path for path in paths if "report-coverage-2210" in path] + +out_sdf = process_json_to_dataframe("mrr", filtered_paths) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2210", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] ) - except EmptyDataFrame: - pass + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/coverage_mrr_rls2210.py b/csit.infra.etl/coverage_mrr_rls2210.py new file mode 100644 index 0000000000..9c9e1c9603 --- /dev/null +++ b/csit.infra.etl/coverage_mrr_rls2210.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-coverage-2210" in path] + +out_sdf = process_json_to_dataframe("mrr", filtered_paths) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2210", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/coverage_ndrpdr_rls2210.py b/csit.infra.etl/coverage_ndrpdr_rls2210.py new file mode 100644 index 0000000000..9c9e1c9603 --- /dev/null +++ b/csit.infra.etl/coverage_ndrpdr_rls2210.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-coverage-2210" in path] + +out_sdf = process_json_to_dataframe("mrr", filtered_paths) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2210", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/coverage_soak_rls2210.py b/csit.infra.etl/coverage_soak_rls2210.py new file mode 100644 index 0000000000..9c9e1c9603 --- /dev/null +++ b/csit.infra.etl/coverage_soak_rls2210.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-coverage-2210" in path] + +out_sdf = process_json_to_dataframe("mrr", filtered_paths) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2210", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/iterative_rls2206.py b/csit.infra.etl/iterative_mrr_rls2210.py similarity index 81% rename from csit.infra.etl/iterative_rls2206.py rename to csit.infra.etl/iterative_mrr_rls2210.py index 88c644b625..b7a8dbcbfa 100644 --- a/csit.infra.etl/iterative_rls2206.py +++ b/csit.infra.etl/iterative_mrr_rls2210.py @@ -141,31 +141,30 @@ paths = wr.s3.list_objects( ignore_empty=True ) -filtered_paths = [path for path in paths if "report-iterative-2206" in path] - -for schema_name in ["mrr", "ndrpdr", "soak"]: - out_sdf = process_json_to_dataframe(schema_name, filtered_paths) - out_sdf.printSchema() - out_sdf = out_sdf \ - .withColumn("year", lit(datetime.now().year)) \ - .withColumn("month", lit(datetime.now().month)) \ - .withColumn("day", lit(datetime.now().day)) \ - .repartition(1) - - try: - wr.s3.to_parquet( - df=out_sdf.toPandas(), - path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2206", - dataset=True, - partition_cols=["test_type", "year", "month", "day"], - compression="snappy", - use_threads=True, - mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) +filtered_paths = [path for path in paths if "report-iterative-2210" in path] + +out_sdf = process_json_to_dataframe("mrr", filtered_paths) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2210", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] ) - except EmptyDataFrame: - pass + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/iterative_ndrpdr_rls2210.py b/csit.infra.etl/iterative_ndrpdr_rls2210.py new file mode 100644 index 0000000000..70ab8158a8 --- /dev/null +++ b/csit.infra.etl/iterative_ndrpdr_rls2210.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-iterative-2210" in path] + +out_sdf = process_json_to_dataframe("ndrpdr", filtered_paths) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2210", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/iterative_soak_rls2210.py b/csit.infra.etl/iterative_soak_rls2210.py new file mode 100644 index 0000000000..b74d7b44dc --- /dev/null +++ b/csit.infra.etl/iterative_soak_rls2210.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-iterative-2210" in path] + +out_sdf = process_json_to_dataframe("soak", filtered_paths) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2210", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/trending.py b/csit.infra.etl/trending_mrr.py similarity index 81% rename from csit.infra.etl/trending.py rename to csit.infra.etl/trending_mrr.py index bc27aaa063..1ba8c69b1b 100644 --- a/csit.infra.etl/trending.py +++ b/csit.infra.etl/trending_mrr.py @@ -143,30 +143,29 @@ paths = wr.s3.list_objects( filtered_paths = [path for path in paths if "daily" in path or "weekly" in path] -for schema_name in ["mrr", "ndrpdr", "soak"]: - out_sdf = process_json_to_dataframe(schema_name, filtered_paths) - out_sdf.show(truncate=False) - out_sdf.printSchema() - out_sdf = out_sdf \ - .withColumn("year", lit(datetime.now().year)) \ - .withColumn("month", lit(datetime.now().month)) \ - .withColumn("day", lit(datetime.now().day)) \ - .repartition(1) - - try: - wr.s3.to_parquet( - df=out_sdf.toPandas(), - path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending", - dataset=True, - partition_cols=["test_type", "year", "month", "day"], - compression="snappy", - use_threads=True, - mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) +out_sdf = process_json_to_dataframe("mrr", filtered_paths) +out_sdf.show(truncate=False) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] ) - except EmptyDataFrame: - pass + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/trending_ndrpdr.py b/csit.infra.etl/trending_ndrpdr.py new file mode 100644 index 0000000000..d3c51ba757 --- /dev/null +++ b/csit.infra.etl/trending_ndrpdr.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"trending_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "daily" in path or "weekly" in path] + +out_sdf = process_json_to_dataframe("ndrpdr", filtered_paths) +out_sdf.show(truncate=False) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/trending_soak.py b/csit.infra.etl/trending_soak.py new file mode 100644 index 0000000000..e54cf9f18a --- /dev/null +++ b/csit.infra.etl/trending_soak.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"trending_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "daily" in path or "weekly" in path] + +out_sdf = process_json_to_dataframe("soak", filtered_paths) +out_sdf.show(truncate=False) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl.hcl.tftpl index 0d0ecfa318..0abb0e5d51 100644 --- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl.hcl.tftpl +++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl.hcl.tftpl @@ -1,100 +1,100 @@ job "${job_name}" { - # The "datacenters" parameter specifies the list of datacenters which should - # be considered when placing this task. This must be provided. datacenters = "${datacenters}" - - # The "type" parameter controls the type of job, which impacts the scheduler's - # decision on placement. For a full list of job types and their differences, - # please see the online documentation. - # - # https://www.nomadproject.io/docs/jobspec/schedulers - # type = "${type}" - - # The periodic stanza allows a job to run at fixed times, dates, or intervals. - # The easiest way to think about the periodic scheduler is "Nomad cron" or - # "distributed cron". - # - # https://www.nomadproject.io/docs/job-specification/periodic - # periodic { cron = "${cron}" prohibit_overlap = "${prohibit_overlap}" time_zone = "${time_zone}" } - - # The "group" stanza defines a series of tasks that should be co-located on - # the same Nomad client. Any task within a group will be placed on the same - # client. - # - # https://www.nomadproject.io/docs/job-specification/group - # group "${job_name}-master" { - # The restart stanza configures a tasks's behavior on task failure. Restarts - # happen on the client that is running the task. - # - # https://www.nomadproject.io/docs/job-specification/restart - # restart { mode = "fail" } - - # The constraint allows restricting the set of eligible nodes. Constraints - # may filter on attributes or client metadata. - # - # For more information and examples on the "volume" stanza, please see - # the online documentation at: - # - # https://www.nomadproject.io/docs/job-specification/constraint - # constraint { attribute = "$${attr.cpu.arch}" operator = "!=" value = "arm64" } - constraint { attribute = "$${node.class}" value = "builder" } - - # The "task" stanza creates an individual unit of work, such as a Docker - # container, web application, or batch processing. - # - # https://www.nomadproject.io/docs/job-specification/task.html - # - task "${job_name}-trending" { - # The artifact stanza instructs Nomad to fetch and unpack a remote - # resource, such as a file, tarball, or binary. - # - # https://www.nomadproject.io/docs/job-specification/artifact - # + task "${job_name}-trending-mrr" { artifact { source = "git::https://github.com/FDio/csit" destination = "local/csit" } - - # The "driver" parameter specifies the task driver that should be used to - # run the task. driver = "docker" - - # The "config" stanza specifies the driver configuration, which is passed - # directly to the driver to start the task. The details of configurations - # are specific to each driver, so please see specific driver - # documentation for more information. config { image = "${image}" command = "gluesparksubmit" args = [ "--driver-memory", "30g", "--executor-memory", "30g", - "trending.py" + "trending_mrr.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + task "${job_name}-trending-ndrpdr" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "30g", + "--executor-memory", "30g", + "trending_ndrpdr.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + task "${job_name}-trending-soak" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "30g", + "--executor-memory", "30g", + "trending_soak.py" ] work_dir = "/local/csit/csit.infra.etl" } - - # The env stanza configures a list of environment variables to populate - # the task's environment before starting. env { AWS_ACCESS_KEY_ID = "${aws_access_key_id}" AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" @@ -104,38 +104,17 @@ job "${job_name}" { OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" ${ envs } } - - # The "resources" stanza describes the requirements a task needs to - # execute. Resource requirements include memory, network, cpu, and more. - # This ensures the task will execute on a machine that contains enough - # resource capacity. - # - # https://www.nomadproject.io/docs/job-specification/resources - # resources { cpu = ${cpu} memory = ${memory} } } task "${job_name}-stats" { - # The artifact stanza instructs Nomad to fetch and unpack a remote - # resource, such as a file, tarball, or binary. - # - # https://www.nomadproject.io/docs/job-specification/artifact - # artifact { source = "git::https://github.com/FDio/csit" destination = "local/csit" } - - # The "driver" parameter specifies the task driver that should be used to - # run the task. driver = "docker" - - # The "config" stanza specifies the driver configuration, which is passed - # directly to the driver to start the task. The details of configurations - # are specific to each driver, so please see specific driver - # documentation for more information. config { image = "${image}" command = "gluesparksubmit" @@ -146,9 +125,6 @@ job "${job_name}" { ] work_dir = "/local/csit/csit.infra.etl" } - - # The env stanza configures a list of environment variables to populate - # the task's environment before starting. env { AWS_ACCESS_KEY_ID = "${aws_access_key_id}" AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" @@ -158,165 +134,248 @@ job "${job_name}" { OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" ${ envs } } - - # The "resources" stanza describes the requirements a task needs to - # execute. Resource requirements include memory, network, cpu, and more. - # This ensures the task will execute on a machine that contains enough - # resource capacity. - # - # https://www.nomadproject.io/docs/job-specification/resources - # resources { cpu = ${cpu} memory = ${memory} } } } -# group "${job_name}-rls2206" { -# # The restart stanza configures a tasks's behavior on task failure. Restarts -# # happen on the client that is running the task. -# # -# # https://www.nomadproject.io/docs/job-specification/restart -# # -# restart { -# mode = "fail" -# } -# -# # The constraint allows restricting the set of eligible nodes. Constraints -# # may filter on attributes or client metadata. -# # -# # For more information and examples on the "volume" stanza, please see -# # the online documentation at: -# # -# # https://www.nomadproject.io/docs/job-specification/constraint -# # -# constraint { -# attribute = "$${attr.cpu.arch}" -# operator = "!=" -# value = "arm64" -# } -# -# constraint { -# attribute = "$${node.class}" -# value = "builder" -# } -# -# # The "task" stanza creates an individual unit of work, such as a Docker -# # container, web application, or batch processing. -# # -# # https://www.nomadproject.io/docs/job-specification/task.html -# # -# task "${job_name}-coverage" { -# # The artifact stanza instructs Nomad to fetch and unpack a remote -# # resource, such as a file, tarball, or binary. -# # -# # https://www.nomadproject.io/docs/job-specification/artifact -# # -# artifact { -# source = "git::https://github.com/FDio/csit" -# destination = "local/csit" -# } -# -# # The "driver" parameter specifies the task driver that should be used to -# # run the task. -# driver = "docker" -# -# # The "config" stanza specifies the driver configuration, which is passed -# # directly to the driver to start the task. The details of configurations -# # are specific to each driver, so please see specific driver -# # documentation for more information. -# config { -# image = "${image}" -# command = "gluesparksubmit" -# args = [ -# "--driver-memory", "20g", -# "--executor-memory", "20g", -# "--executor-cores", "2", -# "--master", "local[2]", -# "coverage_rls2206.py" -# ] -# work_dir = "/local/csit/csit.infra.etl" -# } -# -# # The env stanza configures a list of environment variables to populate -# # the task's environment before starting. -# env { -# AWS_ACCESS_KEY_ID = "${aws_access_key_id}" -# AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" -# AWS_DEFAULT_REGION = "${aws_default_region}" -# OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" -# OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" -# OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" -# ${ envs } -# } -# -# # The "resources" stanza describes the requirements a task needs to -# # execute. Resource requirements include memory, network, cpu, and more. -# # This ensures the task will execute on a machine that contains enough -# # resource capacity. -# # -# # https://www.nomadproject.io/docs/job-specification/resources -# # -# resources { -# cpu = ${cpu} -# memory = ${memory} -# } -# } -# task "${job_name}-iterative" { -# # The artifact stanza instructs Nomad to fetch and unpack a remote -# # resource, such as a file, tarball, or binary. -# # -# # https://www.nomadproject.io/docs/job-specification/artifact -# # -# artifact { -# source = "git::https://github.com/FDio/csit" -# destination = "local/csit" -# } -# -# # The "driver" parameter specifies the task driver that should be used to -# # run the task. -# driver = "docker" -# -# # The "config" stanza specifies the driver configuration, which is passed -# # directly to the driver to start the task. The details of configurations -# # are specific to each driver, so please see specific driver -# # documentation for more information. -# config { -# image = "${image}" -# command = "gluesparksubmit" -# args = [ -# "--driver-memory", "20g", -# "--executor-memory", "20g", -# "--executor-cores", "2", -# "--master", "local[2]", -# "iterative_rls2206.py" -# ] -# work_dir = "/local/csit/csit.infra.etl" -# } -# -# # The env stanza configures a list of environment variables to populate -# # the task's environment before starting. -# env { -# AWS_ACCESS_KEY_ID = "${aws_access_key_id}" -# AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" -# AWS_DEFAULT_REGION = "${aws_default_region}" -# OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" -# OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" -# OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" -# ${ envs } -# } -# -# # The "resources" stanza describes the requirements a task needs to -# # execute. Resource requirements include memory, network, cpu, and more. -# # This ensures the task will execute on a machine that contains enough -# # resource capacity. -# # -# # https://www.nomadproject.io/docs/job-specification/resources -# # -# resources { -# cpu = ${cpu} -# memory = ${memory} -# } -# } -# } + group "${job_name}-rls2210" { + restart { + mode = "fail" + } + constraint { + attribute = "$${attr.cpu.arch}" + operator = "!=" + value = "arm64" + } + constraint { + attribute = "$${node.class}" + value = "builder" + } + task "${job_name}-coverage-device" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "coverage_device_rls2210.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + task "${job_name}-coverage-mrr" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "coverage_mrr_rls2210.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + task "${job_name}-coverage-ndrpdr" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "coverage_ndrpdr_rls2210.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + task "${job_name}-coverage-soak" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "coverage_soak_rls2210.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + task "${job_name}-iterative-mrr" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "iterative_mrr_rls2210.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + task "${job_name}-iterative-ndrpdr" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "iterative_ndrpdr_rls2210.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + task "${job_name}-iterative-soak" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "iterative_soak_rls2210.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + } } -- 2.16.6