diff --git a/dags.py b/dags.py index 5f6f18d71c091f588b1e48d37bed7cc45855c373..d83707ef754636a8c42ef2ca19d62751abc87051 100644 --- a/dags.py +++ b/dags.py @@ -1,39 +1,80 @@ import datetime - from airflow import DAG from airflow.decorators import dag, task from airflow.operators.empty import EmptyOperator from airflow.operators.bash import BashOperator - from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator repo_home = '/opt/airflow/dags/repo' +from airflow.hooks.base_hook import BaseHook +from airflow.models import Connection +from airflow.providers.amazon.aws.hooks.s3 import S3Hook + +# Defina as credenciais e detalhes da conexão +conn_id = 'minio_conn' +conn_type = 'S3' +host = 'http://minio.minio-cluster.svc.cluster.local:9000' +login = 'minioadmin' +password = 'minioadmin' +extra = '{"aws_access_key_id": "minioadmin", "aws_secret_access_key": "minioadmin", "region_name": "us-east-1", "endpoint_url": "http://minio.minio-cluster.svc.cluster.local:9000", "is_secure": false, "verify": false}' + +# Crie a conexão +conn = Connection( + conn_id=conn_id, + conn_type=conn_type, + host=host, + login=login, + password=password, + extra=extra +) + +# Adicione a conexão no Airflow +session = BaseHook.get_session() +session.add(conn) +session.commit() +s3_hook = S3Hook(aws_conn_id='minio_conn') + + @dag() def process_records(): - """ - DAG que realiza o fluxo landing -> bronze -> silver -> gold - """ - - task_1 = SparkSubmitOperator( - application=f"{repo_home}/gen_bronze.py", - task_id="landing_to_bronze", - packages = "org.apache.hadoop:hadoop-aws:3.3.4" - ) - - task_2 = BashOperator( - task_id="dbt_setup", - bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", - ) - - task_3 = BashOperator( - task_id="bronze_to_silver_to_gold", - cwd="/tmp/proj_teste", - bash_command="dbt deps && dbt build", - ) - - task_1 >> task_2 >> task_3 + """ + DAG que realiza o fluxo landing -> bronze -> silver -> gold e limpa os buckets + """ + + # Task para processar os dados da landing para o bronze + task_1 = SparkSubmitOperator( + application=f"{repo_home}/gen_bronze.py", + task_id="landing_to_bronze", + packages="org.apache.hadoop:hadoop-aws:3.3.4" + ) + + # Task para configurar o dbt + task_2 = BashOperator( + task_id="dbt_setup", + bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", + ) + + # Task para rodar o dbt e transformar os dados de bronze para silver e gold + task_3 = BashOperator( + task_id="bronze_to_silver_to_gold", + cwd="/tmp/proj_teste", + bash_command="dbt deps && dbt build", + ) + + # Nova task para limpar os buckets gold, silver e bronze do MinIO + task_4 = BashOperator( + task_id="clean_minio_buckets", + bash_command=""" + mc alias set myminio http://minio:9000 minioadmin minioadmin + mc rm --recursive myminio/gold/* + mc rm --recursive myminio/silver/* + mc rm --recursive myminio/bronze/* + """, + ) + # Definindo a ordem de execução das tasks + task_1 >> task_2 >> task_3 >> task_4 dag = process_records()