diff --git a/teste_dag.py b/teste_dag.py index bc35b36610a28577716d7f6757b85cf3b6a89e4b..9325063da6f5278c5c6ae7254b36f368ad54794a 100644 --- a/teste_dag.py +++ b/teste_dag.py @@ -18,37 +18,27 @@ repo_home = '/opt/airflow/dags/repo' # }, ) def process_records(): - @task - def task_spark(): - SparkSubmitOperator( - task_id="landing_to_bronze", - application=f"{repo_home}/gen_bronze.py", - application_args=[ - "fichas_cadastro_individual_5000.jsonl" - ], - packages="org.apache.hadoop:hadoop-aws:3.3.4" - ) - - @task - def task_dbt_setup(): - BashOperator( - task_id="dbt_setup", - bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", - ) - - @task - def task_bronze_to_gold(): - BashOperator( - task_id="bronze_to_silver_to_gold", - cwd="/tmp/proj_teste", - bash_command="dbt deps && dbt build", - ) - - task1 = task_spark - task2 = task_dbt_setup - task3 = task_bronze_to_gold - - task1 >> task2 >> task3 + task_spark = SparkSubmitOperator( + task_id="landing_to_bronze", + application=f"{repo_home}/gen_bronze.py", + application_args=[ + "fichas_cadastro_individual_5000.jsonl" + ], + packages="org.apache.hadoop:hadoop-aws:3.3.4" + ) + + task_dbt_setup = BashOperator( + task_id="dbt_setup", + bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", + ) + + task_bronze_to_gold = BashOperator( + task_id="bronze_to_silver_to_gold", + cwd="/tmp/proj_teste", + bash_command="dbt deps && dbt build", + ) + + task_spark >> task_dbt_setup >> task_bronze_to_gold dag = process_records()