From 4b34648ebaca8ee468cb4cc906b3df68ebe8eb60 Mon Sep 17 00:00:00 2001 From: edvs19 <edvs19@inf.ufpr.br> Date: Thu, 6 Mar 2025 17:05:52 -0300 Subject: [PATCH] teste --- gen_bronze.py | 3 ++- teste_dag.py | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/gen_bronze.py b/gen_bronze.py index 50c5ea7..2be9984 100644 --- a/gen_bronze.py +++ b/gen_bronze.py @@ -125,7 +125,8 @@ schema = StructType([ print("Esquema definido com sucesso.") # Carregar JSON com o esquema -input_path = "s3a://landing/warehouse" + sys.argv[1] +filename = sys.argv[1] +input_path = "s3a://landing/warehouse/" + filename output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet" try: print("Carregando dados do JSON...") diff --git a/teste_dag.py b/teste_dag.py index 8180ed4..6bac0c8 100644 --- a/teste_dag.py +++ b/teste_dag.py @@ -21,7 +21,7 @@ def process_records(): @task def task_spark(params: dict): return SparkSubmitOperator( - task_id=f"landing_to_bronze", + task_id="landing_to_bronze", application=f"{repo_home}/gen_bronze.py", application_args=[ params["input_file"] @@ -32,14 +32,14 @@ def process_records(): @task def task_dbt_setup(): return BashOperator( - task_id=f"dbt_setup", + task_id="dbt_setup", bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", ) @task def task_bronze_to_gold(): return BashOperator( - task_id=f"bronze_to_silver_to_gold", + task_id="bronze_to_silver_to_gold", cwd="/tmp/proj_teste", bash_command="dbt deps && dbt build", ) -- GitLab