Skip to content
Snippets Groups Projects
Commit 517416ad authored by lcd22's avatar lcd22
Browse files

melhorando codigo

parent 1003c213
No related branches found
No related tags found
No related merge requests found
......@@ -15,110 +15,41 @@ def process_records():
"""
DAG que realiza o fluxo landing -> bronze -> silver -> gold
"""
task_1 = SparkSubmitOperator(
application=f"{repo_home}/gen_bronze_5000.py",
task_id="landing_to_bronze_5000",
tasks = []
sizes = [5000, 10000, 20000, 50000, 100000, 200000] # Tamanhos para as tarefas
for i, size in enumerate(sizes, 1):
# SparkSubmitOperator
task_spark = SparkSubmitOperator(
application=f"{repo_home}/gen_bronze_{size}.py",
task_id=f"landing_to_bronze_{size}",
packages="org.apache.hadoop:hadoop-aws:3.3.4"
)
task_2 = BashOperator(
task_id="dbt_setup1",
# BashOperator 1 - dbt_setup
task_dbt_setup = BashOperator(
task_id=f"dbt_setup{i}",
bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
)
task_3 = BashOperator(
task_id="bronze_to_silver_to_gold1",
# BashOperator 2 - bronze_to_silver_to_gold
task_bronze_to_gold = BashOperator(
task_id=f"bronze_to_silver_to_gold{i}",
cwd="/tmp/proj_teste",
bash_command="dbt deps && dbt build",
)
task_4 = SparkSubmitOperator(
application=f"{repo_home}/gen_bronze_10000.py",
task_id="landing_to_bronze_10000",
packages = "org.apache.hadoop:hadoop-aws:3.3.4"
)
task_5 = BashOperator(
task_id="dbt_setup2",
bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
)
task_6 = BashOperator(
task_id="bronze_to_silver_to_gold2",
cwd="/tmp/proj_teste",
bash_command="dbt deps && dbt build",
)
task_7 = SparkSubmitOperator(
application=f"{repo_home}/gen_bronze_20000.py",
task_id="landing_to_bronze_20000",
packages = "org.apache.hadoop:hadoop-aws:3.3.4"
)
task_8 = BashOperator(
task_id="dbt_setup3",
bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
)
task_9 = BashOperator(
task_id="bronze_to_silver_to_gold3",
cwd="/tmp/proj_teste",
bash_command="dbt deps && dbt build",
)
task_10 = SparkSubmitOperator(
application=f"{repo_home}/gen_bronze_50000.py",
task_id="landing_to_bronze_50000",
packages = "org.apache.hadoop:hadoop-aws:3.3.4"
)
# Encadeando as tarefas
if i == 1:
task_spark >> task_dbt_setup >> task_bronze_to_gold
else:
tasks[-1][1] >> task_spark >> task_dbt_setup >> task_bronze_to_gold
task_11 = BashOperator(
task_id="dbt_setup4",
bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
)
# Armazenando as tarefas para encadeamento posterior
tasks.append((task_spark, task_bronze_to_gold))
task_12 = BashOperator(
task_id="bronze_to_silver_to_gold4",
cwd="/tmp/proj_teste",
bash_command="dbt deps && dbt build",
)
task_13 = SparkSubmitOperator(
application=f"{repo_home}/gen_bronze_100000.py",
task_id="landing_to_bronze_100000",
packages = "org.apache.hadoop:hadoop-aws:3.3.4"
)
task_14 = BashOperator(
task_id="dbt_setup5",
bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
)
task_15 = BashOperator(
task_id="bronze_to_silver_to_gold5",
cwd="/tmp/proj_teste",
bash_command="dbt deps && dbt build",
)
task_16 = SparkSubmitOperator(
application=f"{repo_home}/gen_bronze_200000.py",
task_id="landing_to_bronze_200000",
packages = "org.apache.hadoop:hadoop-aws:3.3.4"
)
task_17 = BashOperator(
task_id="dbt_setup6",
bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
)
task_18 = BashOperator(
task_id="bronze_to_silver_to_gold6",
cwd="/tmp/proj_teste",
bash_command="dbt deps && dbt build",
)
# Garantindo que todas as tarefas estejam encadeadas
for i in range(1, len(tasks)):
tasks[i-1][1] >> tasks[i][0]
task_1 >> task_2 >> task_3 >> task_4 >> task_5 >> task_6 >> task_7 >> task_8 >> task_9 >> task_10 >> task_11 >> task_12 >> task_13 >> task_14 >> task_15 >> task_16 >> task_17 >> task_18
dag = process_records()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment