feat(etl): Add rls2210
[csit.git] / csit.infra.etl / trending_mrr.py
similarity index 81%
rename from csit.infra.etl/trending.py
rename to csit.infra.etl/trending_mrr.py
index bc27aaa..1ba8c69 100644 (file)
@@ -143,30 +143,29 @@ paths = wr.s3.list_objects(
 
 filtered_paths = [path for path in paths if "daily" in path or "weekly" in path]
 
-for schema_name in ["mrr", "ndrpdr", "soak"]:
-    out_sdf = process_json_to_dataframe(schema_name, filtered_paths)
-    out_sdf.show(truncate=False)
-    out_sdf.printSchema()
-    out_sdf = out_sdf \
-        .withColumn("year", lit(datetime.now().year)) \
-        .withColumn("month", lit(datetime.now().month)) \
-        .withColumn("day", lit(datetime.now().day)) \
-        .repartition(1)
-
-    try:
-        wr.s3.to_parquet(
-            df=out_sdf.toPandas(),
-            path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending",
-            dataset=True,
-            partition_cols=["test_type", "year", "month", "day"],
-            compression="snappy",
-            use_threads=True,
-            mode="overwrite_partitions",
-            boto3_session=session.Session(
-                aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
-                aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
-                region_name=environ["OUT_AWS_DEFAULT_REGION"]
-            )
+out_sdf = process_json_to_dataframe("mrr", filtered_paths)
+out_sdf.show(truncate=False)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+    .withColumn("year", lit(datetime.now().year)) \
+    .withColumn("month", lit(datetime.now().month)) \
+    .withColumn("day", lit(datetime.now().day)) \
+    .repartition(1)
+
+try:
+    wr.s3.to_parquet(
+        df=out_sdf.toPandas(),
+        path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending",
+        dataset=True,
+        partition_cols=["test_type", "year", "month", "day"],
+        compression="snappy",
+        use_threads=True,
+        mode="overwrite_partitions",
+        boto3_session=session.Session(
+            aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+            aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+            region_name=environ["OUT_AWS_DEFAULT_REGION"]
         )
-    except EmptyDataFrame:
-        pass
+    )
+except EmptyDataFrame:
+    pass