diff --git a/dags.py b/dags.py index bf437efdac3db24a67a4474d036cefde76ae1b4f..96db8a492da002f8ebf7d488d69d70847de1c77d 100644 --- a/dags.py +++ b/dags.py @@ -15,6 +15,11 @@ def process_records(): """ DAG que realiza o fluxo landing -> bronze -> silver -> gold """ + task_1 = SparkSubmitOperator( + application=f"{repo_home}/gen_bronze_5000.py", + task_id="landing_to_bronze_5000", + packages = "org.apache.hadoop:hadoop-aws:3.3.4" + ) task_2 = BashOperator( task_id="dbt_setup", @@ -28,41 +33,92 @@ def process_records(): ) task_4 = SparkSubmitOperator( - application=f"{repo_home}/gen_bronze_5000.py", - task_id="landing_to_bronze_5000", - packages = "org.apache.hadoop:hadoop-aws:3.3.4" - ) - - task_5 = SparkSubmitOperator( application=f"{repo_home}/gen_bronze_10000.py", task_id="landing_to_bronze_10000", packages = "org.apache.hadoop:hadoop-aws:3.3.4" ) - task_6 = SparkSubmitOperator( + task_5 = BashOperator( + task_id="dbt_setup", + bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", + ) + + task_6 = BashOperator( + task_id="bronze_to_silver_to_gold", + cwd="/tmp/proj_teste", + bash_command="dbt deps && dbt build", + ) + + task_7 = SparkSubmitOperator( application=f"{repo_home}/gen_bronze_20000.py", task_id="landing_to_bronze_20000", packages = "org.apache.hadoop:hadoop-aws:3.3.4" ) - task_7 = SparkSubmitOperator( + + task_8 = BashOperator( + task_id="dbt_setup", + bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", + ) + + task_9 = BashOperator( + task_id="bronze_to_silver_to_gold", + cwd="/tmp/proj_teste", + bash_command="dbt deps && dbt build", + ) + + + task_10 = SparkSubmitOperator( application=f"{repo_home}/gen_bronze_50000.py", task_id="landing_to_bronze_50000", packages = "org.apache.hadoop:hadoop-aws:3.3.4" ) - task_8 = SparkSubmitOperator( + + task_11 = BashOperator( + task_id="dbt_setup", + bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", + ) + + task_12 = BashOperator( + task_id="bronze_to_silver_to_gold", + cwd="/tmp/proj_teste", + bash_command="dbt deps && dbt build", + ) + + task_13 = SparkSubmitOperator( application=f"{repo_home}/gen_bronze_100000.py", task_id="landing_to_bronze_100000", packages = "org.apache.hadoop:hadoop-aws:3.3.4" ) - task_9 = SparkSubmitOperator( + task_14 = BashOperator( + task_id="dbt_setup", + bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", + ) + + task_15 = BashOperator( + task_id="bronze_to_silver_to_gold", + cwd="/tmp/proj_teste", + bash_command="dbt deps && dbt build", + ) + + task_16 = SparkSubmitOperator( application=f"{repo_home}/gen_bronze_200000.py", task_id="landing_to_bronze_200000", packages = "org.apache.hadoop:hadoop-aws:3.3.4" ) - task_4 >> task_2 >> task_3 >> task_5 >> task_2 >> task_3 >> task_6 >> task_2 >> task_3 >> task_7 >> task_2 >> task_3 >> task_8 >> task_2 >> task_3 >> task_9 >> task_2 >> task_3 + task_17 = BashOperator( + task_id="dbt_setup", + bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", + ) + + task_18 = BashOperator( + task_id="bronze_to_silver_to_gold", + cwd="/tmp/proj_teste", + bash_command="dbt deps && dbt build", + ) + task_1 >> task_2 >> task_3 >> task_4 >> task_5 >> task_6 >> task_7 >> task_8 >> task_9 >> task_10 >> task_11 >> task_12 >> task_13 >> task_14 >> task_15 >> task_16 >> task_17 >> task_18 dag = process_records()