diff --git a/gen_bronze.py b/gen_bronze.py index 50c5ea7cf66727765ae3c5f880d892bcec6a68df..2be99842db45618935a8bb066c1179b7bb0fe6ec 100644 --- a/gen_bronze.py +++ b/gen_bronze.py @@ -125,7 +125,8 @@ schema = StructType([ print("Esquema definido com sucesso.") # Carregar JSON com o esquema -input_path = "s3a://landing/warehouse" + sys.argv[1] +filename = sys.argv[1] +input_path = "s3a://landing/warehouse/" + filename output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet" try: print("Carregando dados do JSON...") diff --git a/teste_dag.py b/teste_dag.py index 8180ed4e50280630cceff62da5bd728ee1409626..6bac0c8b09d24b3c75642ae6463fed8730d883ea 100644 --- a/teste_dag.py +++ b/teste_dag.py @@ -21,7 +21,7 @@ def process_records(): @task def task_spark(params: dict): return SparkSubmitOperator( - task_id=f"landing_to_bronze", + task_id="landing_to_bronze", application=f"{repo_home}/gen_bronze.py", application_args=[ params["input_file"] @@ -32,14 +32,14 @@ def process_records(): @task def task_dbt_setup(): return BashOperator( - task_id=f"dbt_setup", + task_id="dbt_setup", bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", ) @task def task_bronze_to_gold(): return BashOperator( - task_id=f"bronze_to_silver_to_gold", + task_id="bronze_to_silver_to_gold", cwd="/tmp/proj_teste", bash_command="dbt deps && dbt build", )