From 735dac7064b820e5dbe1a23b7fe0364273803c50 Mon Sep 17 00:00:00 2001 From: edvs19 <edvs19@inf.ufpr.br> Date: Thu, 6 Mar 2025 15:25:40 -0300 Subject: [PATCH] teste --- gen_bronze.py | 2 +- teste_dag.py | 42 +----------------------------------------- 2 files changed, 2 insertions(+), 42 deletions(-) diff --git a/gen_bronze.py b/gen_bronze.py index 62ed0f8..06af097 100644 --- a/gen_bronze.py +++ b/gen_bronze.py @@ -125,7 +125,7 @@ schema = StructType([ print("Esquema definido com sucesso.") # Carregar JSON com o esquema -input_path = "s3a://landing/" + sys.args[1] +input_path = "s3a://landing/" + sys.argv[1] output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet" try: print("Carregando dados do JSON...") diff --git a/teste_dag.py b/teste_dag.py index bcf28a3..39a3333 100644 --- a/teste_dag.py +++ b/teste_dag.py @@ -4,52 +4,12 @@ from airflow import DAG from airflow.decorators import dag, task from airflow.operators.empty import EmptyOperator from airflow.operators.bash import BashOperator +from airflow.models.param import Param from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator -from airflow.models import Variable - - - repo_home = '/opt/airflow/dags/repo' -# @dag() -# def process_records(): -# """ -# DAG que realiza o fluxo landing -> bronze -> silver -> gold -# """ -# # SparkSubmitOperator -# task_spark = SparkSubmitOperator( -# task_id=f"landing_to_bronze", -# application=f"{repo_home}/gen_bronze.py", -# application_args=[ -# Variable.get("input_file") -# ], -# packages="org.apache.hadoop:hadoop-aws:3.3.4" -# ) - -# # BashOperator 1 - dbt_setup -# task_dbt_setup = BashOperator( -# task_id=f"dbt_setup", -# bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", -# ) - -# # BashOperator 2 - bronze_to_silver_to_gold -# task_bronze_to_gold = BashOperator( -# task_id=f"bronze_to_silver_to_gold", -# cwd="/tmp/proj_teste", -# bash_command="dbt deps && dbt build", -# ) - - -# task_spark >> task_dbt_setup >> task_bronze_to_gold - -# dag = process_records() - - -from airflow import DAG -from airflow.decorators import task -from airflow.models.param import Param @dag( dag_id="process_records_teste", -- GitLab