From 88e28552cb14f5638efad5732f9f05cbe5b94c5e Mon Sep 17 00:00:00 2001 From: edvs19 <edvs19@inf.ufpr.br> Date: Thu, 6 Mar 2025 17:12:52 -0300 Subject: [PATCH] teste --- teste_dag.py | 52 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/teste_dag.py b/teste_dag.py index 1ef297d..1bee801 100644 --- a/teste_dag.py +++ b/teste_dag.py @@ -17,28 +17,36 @@ repo_home = '/opt/airflow/dags/repo' "input_file": Param("fichas_cadastro_individual_5000.jsonl", type="string"), }, ) -def process_records(params: dict): - task_spark = SparkSubmitOperator( - task_id="landing_to_bronze", - application=f"{repo_home}/gen_bronze.py", - application_args=[ - params["input_file"] - ], - packages="org.apache.hadoop:hadoop-aws:3.3.4" - ) - - task_dbt_setup = BashOperator( - task_id="dbt_setup", - bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", - ) - - task_bronze_to_gold = BashOperator( - task_id="bronze_to_silver_to_gold", - cwd="/tmp/proj_teste", - bash_command="dbt deps && dbt build", - ) - - task_spark >> task_dbt_setup >> task_bronze_to_gold +def process_records(): + @task + def task_spark(params: dict): + return SparkSubmitOperator( + task_id="landing_to_bronze", + application=f"{repo_home}/gen_bronze.py", + application_args=[ + params["input_file"] + ], + packages="org.apache.hadoop:hadoop-aws:3.3.4" + ) + + @task + def task_dbt_setup(): + return BashOperator( + task_id="dbt_setup", + bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", + ) + + @task + def task_bronze_to_gold(): + return BashOperator( + task_id="bronze_to_silver_to_gold", + cwd="/tmp/proj_teste", + bash_command="dbt deps && dbt build", + ) + + task1 = task_spark() + task2 = task_dbt_setup() + task3 = task_bronze_to_gold() dag = process_records() -- GitLab