diff --git a/gen_bronze.py b/gen_bronze.py index 62ed0f8da10f84e3fec9396af67c3173ed4798ee..06af09749bd21213701f462057d7cca9c21d9ef5 100644 --- a/gen_bronze.py +++ b/gen_bronze.py @@ -125,7 +125,7 @@ schema = StructType([ print("Esquema definido com sucesso.") # Carregar JSON com o esquema -input_path = "s3a://landing/" + sys.args[1] +input_path = "s3a://landing/" + sys.argv[1] output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet" try: print("Carregando dados do JSON...") diff --git a/teste_dag.py b/teste_dag.py index bcf28a3787c9b80e052285941e04bfae93a53185..39a33338409d13cfff50fbe6e87363dcde37f6bf 100644 --- a/teste_dag.py +++ b/teste_dag.py @@ -4,52 +4,12 @@ from airflow import DAG from airflow.decorators import dag, task from airflow.operators.empty import EmptyOperator from airflow.operators.bash import BashOperator +from airflow.models.param import Param from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator -from airflow.models import Variable - - - repo_home = '/opt/airflow/dags/repo' -# @dag() -# def process_records(): -# """ -# DAG que realiza o fluxo landing -> bronze -> silver -> gold -# """ -# # SparkSubmitOperator -# task_spark = SparkSubmitOperator( -# task_id=f"landing_to_bronze", -# application=f"{repo_home}/gen_bronze.py", -# application_args=[ -# Variable.get("input_file") -# ], -# packages="org.apache.hadoop:hadoop-aws:3.3.4" -# ) - -# # BashOperator 1 - dbt_setup -# task_dbt_setup = BashOperator( -# task_id=f"dbt_setup", -# bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", -# ) - -# # BashOperator 2 - bronze_to_silver_to_gold -# task_bronze_to_gold = BashOperator( -# task_id=f"bronze_to_silver_to_gold", -# cwd="/tmp/proj_teste", -# bash_command="dbt deps && dbt build", -# ) - - -# task_spark >> task_dbt_setup >> task_bronze_to_gold - -# dag = process_records() - - -from airflow import DAG -from airflow.decorators import task -from airflow.models.param import Param @dag( dag_id="process_records_teste",