X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=csit.infra.etl%2Ftrending_mrr.py;fp=csit.infra.etl%2Ftrending.py;h=1ba8c69b1b1d54822e8a35f0dcd7df7d283ffd53;hp=bc27aaa06348c10b226556e4e29707723b2c4a44;hb=bff439b69ee71b654b1da92564ff62de7327fe71;hpb=3f16ce1bbca46437c648d75f6a15d23dae0b8fc1 diff --git a/csit.infra.etl/trending.py b/csit.infra.etl/trending_mrr.py similarity index 81% rename from csit.infra.etl/trending.py rename to csit.infra.etl/trending_mrr.py index bc27aaa063..1ba8c69b1b 100644 --- a/csit.infra.etl/trending.py +++ b/csit.infra.etl/trending_mrr.py @@ -143,30 +143,29 @@ paths = wr.s3.list_objects( filtered_paths = [path for path in paths if "daily" in path or "weekly" in path] -for schema_name in ["mrr", "ndrpdr", "soak"]: - out_sdf = process_json_to_dataframe(schema_name, filtered_paths) - out_sdf.show(truncate=False) - out_sdf.printSchema() - out_sdf = out_sdf \ - .withColumn("year", lit(datetime.now().year)) \ - .withColumn("month", lit(datetime.now().month)) \ - .withColumn("day", lit(datetime.now().day)) \ - .repartition(1) - - try: - wr.s3.to_parquet( - df=out_sdf.toPandas(), - path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending", - dataset=True, - partition_cols=["test_type", "year", "month", "day"], - compression="snappy", - use_threads=True, - mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) +out_sdf = process_json_to_dataframe("mrr", filtered_paths) +out_sdf.show(truncate=False) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] ) - except EmptyDataFrame: - pass + ) +except EmptyDataFrame: + pass