diff --git a/gen_bronze.py b/gen_bronze.py index 06af09749bd21213701f462057d7cca9c21d9ef5..50c5ea7cf66727765ae3c5f880d892bcec6a68df 100644 --- a/gen_bronze.py +++ b/gen_bronze.py @@ -125,7 +125,7 @@ schema = StructType([ print("Esquema definido com sucesso.") # Carregar JSON com o esquema -input_path = "s3a://landing/" + sys.argv[1] +input_path = "s3a://landing/warehouse" + sys.argv[1] output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet" try: print("Carregando dados do JSON...") diff --git a/teste_dag.py b/teste_dag.py index 39a33338409d13cfff50fbe6e87363dcde37f6bf..8180ed4e50280630cceff62da5bd728ee1409626 100644 --- a/teste_dag.py +++ b/teste_dag.py @@ -14,7 +14,7 @@ repo_home = '/opt/airflow/dags/repo' @dag( dag_id="process_records_teste", params={ - "input_file": Param("fichas_cadastro_individual",type="string"), + "input_file": Param("fichas_cadastro_individual_5000.jsonl", type="string"), }, ) def process_records(): @@ -30,14 +30,14 @@ def process_records(): ) @task - def task_dbt_setup(params: dict): + def task_dbt_setup(): return BashOperator( task_id=f"dbt_setup", bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", ) @task - def task_bronze_to_gold(params: dict): + def task_bronze_to_gold(): return BashOperator( task_id=f"bronze_to_silver_to_gold", cwd="/tmp/proj_teste",