Skip to content
Snippets Groups Projects
Commit 53c37125 authored by lcd22's avatar lcd22
Browse files

retorna pro original

parent 8aa60237
No related branches found
No related tags found
No related merge requests found
import datetime
from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
repo_home = '/opt/airflow/dags/repo'
from airflow.hooks.base_hook import BaseHook
from airflow.models import Connection
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
# Defina as credenciais e detalhes da conexão
conn_id = 'minio_conn'
conn_type = 'S3'
host = 'http://minio.minio-cluster.svc.cluster.local:9000'
login = 'minioadmin'
password = 'minioadmin'
extra = '{"aws_access_key_id": "minioadmin", "aws_secret_access_key": "minioadmin", "region_name": "us-east-1", "endpoint_url": "http://minio.minio-cluster.svc.cluster.local:9000", "is_secure": false, "verify": false}'
# Crie a conexão
conn = Connection(
conn_id=conn_id,
conn_type=conn_type,
host=host,
login=login,
password=password,
extra=extra
)
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
# Adicione a conexão no Airflow
session = BaseHook.get_session()
session.add(conn)
session.commit()
s3_hook = S3Hook(aws_conn_id='minio_conn')
repo_home = '/opt/airflow/dags/repo'
@dag()
def process_records():
"""
DAG que realiza o fluxo landing -> bronze -> silver -> gold e limpa os buckets
DAG que realiza o fluxo landing -> bronze -> silver -> gold
"""
# Task para processar os dados da landing para o bronze
task_1 = SparkSubmitOperator(
application=f"{repo_home}/gen_bronze.py",
task_id="landing_to_bronze",
packages = "org.apache.hadoop:hadoop-aws:3.3.4"
)
# Task para configurar o dbt
task_2 = BashOperator(
task_id="dbt_setup",
bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
)
# Task para rodar o dbt e transformar os dados de bronze para silver e gold
task_3 = BashOperator(
task_id="bronze_to_silver_to_gold",
cwd="/tmp/proj_teste",
bash_command="dbt deps && dbt build",
)
# Nova task para limpar os buckets gold, silver e bronze do MinIO
task_4 = BashOperator(
task_id="clean_minio_buckets",
bash_command="""
mc alias set myminio http://minio:9000 minioadmin minioadmin
mc rm --recursive myminio/gold/*
mc rm --recursive myminio/silver/*
mc rm --recursive myminio/bronze/*
""",
)
task_1 >> task_2 >> task_3
# Definindo a ordem de execução das tasks
task_1 >> task_2 >> task_3 >> task_4
dag = process_records()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment