Skip to content
Snippets Groups Projects
Commit 8aa60237 authored by lcd22's avatar lcd22
Browse files

tentativa de conectar ao minio

parent 74f81842
No related branches found
No related tags found
No related merge requests found
import datetime
from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
repo_home = '/opt/airflow/dags/repo'
from airflow.hooks.base_hook import BaseHook
from airflow.models import Connection
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
# Defina as credenciais e detalhes da conexão
conn_id = 'minio_conn'
conn_type = 'S3'
host = 'http://minio.minio-cluster.svc.cluster.local:9000'
login = 'minioadmin'
password = 'minioadmin'
extra = '{"aws_access_key_id": "minioadmin", "aws_secret_access_key": "minioadmin", "region_name": "us-east-1", "endpoint_url": "http://minio.minio-cluster.svc.cluster.local:9000", "is_secure": false, "verify": false}'
# Crie a conexão
conn = Connection(
conn_id=conn_id,
conn_type=conn_type,
host=host,
login=login,
password=password,
extra=extra
)
# Adicione a conexão no Airflow
session = BaseHook.get_session()
session.add(conn)
session.commit()
s3_hook = S3Hook(aws_conn_id='minio_conn')
@dag()
def process_records():
"""
DAG que realiza o fluxo landing -> bronze -> silver -> gold
DAG que realiza o fluxo landing -> bronze -> silver -> gold e limpa os buckets
"""
# Task para processar os dados da landing para o bronze
task_1 = SparkSubmitOperator(
application=f"{repo_home}/gen_bronze.py",
task_id="landing_to_bronze",
packages="org.apache.hadoop:hadoop-aws:3.3.4"
)
# Task para configurar o dbt
task_2 = BashOperator(
task_id="dbt_setup",
bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
)
# Task para rodar o dbt e transformar os dados de bronze para silver e gold
task_3 = BashOperator(
task_id="bronze_to_silver_to_gold",
cwd="/tmp/proj_teste",
bash_command="dbt deps && dbt build",
)
task_1 >> task_2 >> task_3
# Nova task para limpar os buckets gold, silver e bronze do MinIO
task_4 = BashOperator(
task_id="clean_minio_buckets",
bash_command="""
mc alias set myminio http://minio:9000 minioadmin minioadmin
mc rm --recursive myminio/gold/*
mc rm --recursive myminio/silver/*
mc rm --recursive myminio/bronze/*
""",
)
# Definindo a ordem de execução das tasks
task_1 >> task_2 >> task_3 >> task_4
dag = process_records()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment