From ef00bf8f102635af669d27d5ae5f5ffffe1b3b23 Mon Sep 17 00:00:00 2001
From: edvs19 <edvs19@inf.ufpr.br>
Date: Thu, 6 Mar 2025 19:18:36 -0300
Subject: [PATCH] teste

---
 teste_dag.py | 43 ++++++++++++++++++++-----------------------
 1 file changed, 20 insertions(+), 23 deletions(-)

diff --git a/teste_dag.py b/teste_dag.py
index 04b0585..acecade 100644
--- a/teste_dag.py
+++ b/teste_dag.py
@@ -19,32 +19,29 @@ with DAG(
         "input_file": Param("invalid_file.jsonl", type="string"),
     },
 ) as dag1:
+    dag1.log.info(f"Input file {dag1.params["input_file"]}")
     
-    @task
-    def task_spark(params: dict):
-        SparkSubmitOperator(
-            task_id="landing_to_bronze",
-            application=f"{repo_home}/gen_bronze.py",
-            application_args=[
-                params["input_file"]
-            ],
-            packages="org.apache.hadoop:hadoop-aws:3.3.4"
-        )
+    file = dag1.params["input_file"]
 
-    @task
-    def task_dbt_setup():
-        BashOperator(
-            task_id="dbt_setup",
-            bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
-        )
+    task_spark = SparkSubmitOperator(
+        task_id="landing_to_bronze",
+        application=f"{repo_home}/gen_bronze.py",
+        application_args=[
+            file
+        ],
+        packages="org.apache.hadoop:hadoop-aws:3.3.4"
+    )
 
-    @task
-    def task_bronze_to_gold():
-        BashOperator(
-            task_id="bronze_to_silver_to_gold",
-            cwd="/tmp/proj_teste",
-            bash_command="dbt deps && dbt build",
-        )
+    task_dbt_setup = BashOperator(
+        task_id="dbt_setup",
+        bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
+    )
+
+    task_bronze_to_gold = BashOperator(
+        task_id="bronze_to_silver_to_gold",
+        cwd="/tmp/proj_teste",
+        bash_command="dbt deps && dbt build",
+    )
 
     task_spark >> task_dbt_setup >> task_bronze_to_gold
 
-- 
GitLab