Skip to content
Snippets Groups Projects
Commit a7945aa8 authored by edvs19's avatar edvs19
Browse files

teste

parent 735dac70
No related branches found
No related tags found
No related merge requests found
...@@ -125,7 +125,7 @@ schema = StructType([ ...@@ -125,7 +125,7 @@ schema = StructType([
print("Esquema definido com sucesso.") print("Esquema definido com sucesso.")
# Carregar JSON com o esquema # Carregar JSON com o esquema
input_path = "s3a://landing/" + sys.argv[1] input_path = "s3a://landing/warehouse" + sys.argv[1]
output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet" output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet"
try: try:
print("Carregando dados do JSON...") print("Carregando dados do JSON...")
......
...@@ -14,7 +14,7 @@ repo_home = '/opt/airflow/dags/repo' ...@@ -14,7 +14,7 @@ repo_home = '/opt/airflow/dags/repo'
@dag( @dag(
dag_id="process_records_teste", dag_id="process_records_teste",
params={ params={
"input_file": Param("fichas_cadastro_individual",type="string"), "input_file": Param("fichas_cadastro_individual_5000.jsonl", type="string"),
}, },
) )
def process_records(): def process_records():
...@@ -30,14 +30,14 @@ def process_records(): ...@@ -30,14 +30,14 @@ def process_records():
) )
@task @task
def task_dbt_setup(params: dict): def task_dbt_setup():
return BashOperator( return BashOperator(
task_id=f"dbt_setup", task_id=f"dbt_setup",
bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
) )
@task @task
def task_bronze_to_gold(params: dict): def task_bronze_to_gold():
return BashOperator( return BashOperator(
task_id=f"bronze_to_silver_to_gold", task_id=f"bronze_to_silver_to_gold",
cwd="/tmp/proj_teste", cwd="/tmp/proj_teste",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment