diff --git a/dags.py b/dags.py index b320949b422684ddf49e438ae9389667a86123b5..464e40812a7644c2f8a973cf5d6e88353598c40f 100644 --- a/dags.py +++ b/dags.py @@ -12,113 +12,44 @@ repo_home = '/opt/airflow/dags/repo' @dag() def process_records(): - """ - DAG que realiza o fluxo landing -> bronze -> silver -> gold - """ - task_1 = SparkSubmitOperator( - application=f"{repo_home}/gen_bronze_5000.py", - task_id="landing_to_bronze_5000", - packages = "org.apache.hadoop:hadoop-aws:3.3.4" - ) - - task_2 = BashOperator( - task_id="dbt_setup1", - bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", - ) - - task_3 = BashOperator( - task_id="bronze_to_silver_to_gold1", - cwd="/tmp/proj_teste", - bash_command="dbt deps && dbt build", - ) - - task_4 = SparkSubmitOperator( - application=f"{repo_home}/gen_bronze_10000.py", - task_id="landing_to_bronze_10000", - packages = "org.apache.hadoop:hadoop-aws:3.3.4" - ) - - task_5 = BashOperator( - task_id="dbt_setup2", - bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", - ) - - task_6 = BashOperator( - task_id="bronze_to_silver_to_gold2", - cwd="/tmp/proj_teste", - bash_command="dbt deps && dbt build", - ) - - task_7 = SparkSubmitOperator( - application=f"{repo_home}/gen_bronze_20000.py", - task_id="landing_to_bronze_20000", - packages = "org.apache.hadoop:hadoop-aws:3.3.4" - ) - - - task_8 = BashOperator( - task_id="dbt_setup3", - bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", - ) - - task_9 = BashOperator( - task_id="bronze_to_silver_to_gold3", - cwd="/tmp/proj_teste", - bash_command="dbt deps && dbt build", - ) - - - task_10 = SparkSubmitOperator( - application=f"{repo_home}/gen_bronze_50000.py", - task_id="landing_to_bronze_50000", - packages = "org.apache.hadoop:hadoop-aws:3.3.4" - ) - - - task_11 = BashOperator( - task_id="dbt_setup4", - bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", - ) - - task_12 = BashOperator( - task_id="bronze_to_silver_to_gold4", - cwd="/tmp/proj_teste", - bash_command="dbt deps && dbt build", - ) - - task_13 = SparkSubmitOperator( - application=f"{repo_home}/gen_bronze_100000.py", - task_id="landing_to_bronze_100000", - packages = "org.apache.hadoop:hadoop-aws:3.3.4" - ) - - task_14 = BashOperator( - task_id="dbt_setup5", - bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", - ) - - task_15 = BashOperator( - task_id="bronze_to_silver_to_gold5", - cwd="/tmp/proj_teste", - bash_command="dbt deps && dbt build", - ) - - task_16 = SparkSubmitOperator( - application=f"{repo_home}/gen_bronze_200000.py", - task_id="landing_to_bronze_200000", - packages = "org.apache.hadoop:hadoop-aws:3.3.4" - ) - - task_17 = BashOperator( - task_id="dbt_setup6", - bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", - ) - - task_18 = BashOperator( - task_id="bronze_to_silver_to_gold6", - cwd="/tmp/proj_teste", - bash_command="dbt deps && dbt build", - ) - - task_1 >> task_2 >> task_3 >> task_4 >> task_5 >> task_6 >> task_7 >> task_8 >> task_9 >> task_10 >> task_11 >> task_12 >> task_13 >> task_14 >> task_15 >> task_16 >> task_17 >> task_18 -dag = process_records() + """ + DAG que realiza o fluxo landing -> bronze -> silver -> gold + """ + tasks = [] + sizes = [5000, 10000, 20000, 50000, 100000, 200000] # Tamanhos para as tarefas + + for i, size in enumerate(sizes, 1): + # SparkSubmitOperator + task_spark = SparkSubmitOperator( + application=f"{repo_home}/gen_bronze_{size}.py", + task_id=f"landing_to_bronze_{size}", + packages="org.apache.hadoop:hadoop-aws:3.3.4" + ) + + # BashOperator 1 - dbt_setup + task_dbt_setup = BashOperator( + task_id=f"dbt_setup{i}", + bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", + ) + + # BashOperator 2 - bronze_to_silver_to_gold + task_bronze_to_gold = BashOperator( + task_id=f"bronze_to_silver_to_gold{i}", + cwd="/tmp/proj_teste", + bash_command="dbt deps && dbt build", + ) + + # Encadeando as tarefas + if i == 1: + task_spark >> task_dbt_setup >> task_bronze_to_gold + else: + tasks[-1][1] >> task_spark >> task_dbt_setup >> task_bronze_to_gold + + # Armazenando as tarefas para encadeamento posterior + tasks.append((task_spark, task_bronze_to_gold)) + + # Garantindo que todas as tarefas estejam encadeadas + for i in range(1, len(tasks)): + tasks[i-1][1] >> tasks[i][0] + +dag = process_records() \ No newline at end of file diff --git a/gen_bronze.py b/gen_bronze.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391