From 53c37125ed55a3f48a5748116416977aaf0d0232 Mon Sep 17 00:00:00 2001 From: lcd22 <lcd22@inf.ufpr.br> Date: Thu, 6 Mar 2025 12:19:13 +0000 Subject: [PATCH] retorna pro original --- dags.py | 91 ++++++++++++++++----------------------------------------- 1 file changed, 25 insertions(+), 66 deletions(-) diff --git a/dags.py b/dags.py index d83707e..6b820af 100644 --- a/dags.py +++ b/dags.py @@ -1,80 +1,39 @@ import datetime + from airflow import DAG from airflow.decorators import dag, task from airflow.operators.empty import EmptyOperator from airflow.operators.bash import BashOperator -from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator - -repo_home = '/opt/airflow/dags/repo' - - -from airflow.hooks.base_hook import BaseHook -from airflow.models import Connection -from airflow.providers.amazon.aws.hooks.s3 import S3Hook - -# Defina as credenciais e detalhes da conexão -conn_id = 'minio_conn' -conn_type = 'S3' -host = 'http://minio.minio-cluster.svc.cluster.local:9000' -login = 'minioadmin' -password = 'minioadmin' -extra = '{"aws_access_key_id": "minioadmin", "aws_secret_access_key": "minioadmin", "region_name": "us-east-1", "endpoint_url": "http://minio.minio-cluster.svc.cluster.local:9000", "is_secure": false, "verify": false}' -# Crie a conexão -conn = Connection( - conn_id=conn_id, - conn_type=conn_type, - host=host, - login=login, - password=password, - extra=extra -) +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator -# Adicione a conexão no Airflow -session = BaseHook.get_session() -session.add(conn) -session.commit() -s3_hook = S3Hook(aws_conn_id='minio_conn') +repo_home = '/opt/airflow/dags/repo' @dag() def process_records(): - """ - DAG que realiza o fluxo landing -> bronze -> silver -> gold e limpa os buckets - """ - - # Task para processar os dados da landing para o bronze - task_1 = SparkSubmitOperator( - application=f"{repo_home}/gen_bronze.py", - task_id="landing_to_bronze", - packages="org.apache.hadoop:hadoop-aws:3.3.4" - ) - - # Task para configurar o dbt - task_2 = BashOperator( - task_id="dbt_setup", - bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", - ) - - # Task para rodar o dbt e transformar os dados de bronze para silver e gold - task_3 = BashOperator( - task_id="bronze_to_silver_to_gold", - cwd="/tmp/proj_teste", - bash_command="dbt deps && dbt build", - ) - - # Nova task para limpar os buckets gold, silver e bronze do MinIO - task_4 = BashOperator( - task_id="clean_minio_buckets", - bash_command=""" - mc alias set myminio http://minio:9000 minioadmin minioadmin - mc rm --recursive myminio/gold/* - mc rm --recursive myminio/silver/* - mc rm --recursive myminio/bronze/* - """, - ) + """ + DAG que realiza o fluxo landing -> bronze -> silver -> gold + """ + + task_1 = SparkSubmitOperator( + application=f"{repo_home}/gen_bronze.py", + task_id="landing_to_bronze", + packages = "org.apache.hadoop:hadoop-aws:3.3.4" + ) + + task_2 = BashOperator( + task_id="dbt_setup", + bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", + ) + + task_3 = BashOperator( + task_id="bronze_to_silver_to_gold", + cwd="/tmp/proj_teste", + bash_command="dbt deps && dbt build", + ) + + task_1 >> task_2 >> task_3 - # Definindo a ordem de execução das tasks - task_1 >> task_2 >> task_3 >> task_4 dag = process_records() -- GitLab