-for schema_name in ["mrr", "ndrpdr", "soak"]:
- out_sdf = process_json_to_dataframe(schema_name, filtered_paths)
- out_sdf.show(truncate=False)
- out_sdf.printSchema()
- out_sdf = out_sdf \
- .withColumn("year", lit(datetime.now().year)) \
- .withColumn("month", lit(datetime.now().month)) \
- .withColumn("day", lit(datetime.now().day)) \
- .repartition(1)
-
- try:
- wr.s3.to_parquet(
- df=out_sdf.toPandas(),
- path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending",
- dataset=True,
- partition_cols=["test_type", "year", "month", "day"],
- compression="snappy",
- use_threads=True,
- mode="overwrite_partitions",
- boto3_session=session.Session(
- aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
- aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
- region_name=environ["OUT_AWS_DEFAULT_REGION"]
- )
+out_sdf = process_json_to_dataframe("mrr", filtered_paths)
+out_sdf.show(truncate=False)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+ .withColumn("year", lit(datetime.now().year)) \
+ .withColumn("month", lit(datetime.now().month)) \
+ .withColumn("day", lit(datetime.now().day)) \
+ .repartition(1)
+
+try:
+ wr.s3.to_parquet(
+ df=out_sdf.toPandas(),
+ path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending",
+ dataset=True,
+ partition_cols=["test_type", "year", "month", "day"],
+ compression="snappy",
+ use_threads=True,
+ mode="overwrite_partitions",
+ boto3_session=session.Session(
+ aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+ aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+ region_name=environ["OUT_AWS_DEFAULT_REGION"]