Skip to content
Snippets Groups Projects
Commit 0173220e authored by lcd22's avatar lcd22
Browse files

add clean buckets

parent ff5f08f3
No related merge requests found
...@@ -10,6 +10,28 @@ from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOpe ...@@ -10,6 +10,28 @@ from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOpe
repo_home = '/opt/airflow/dags/repo' repo_home = '/opt/airflow/dags/repo'
# Função para limpar o buckets do Minio
def clean_buckets():
# Configurar cliente do Minio
minio_client = Minio(
"http://minio.minio-cluster.svc.cluster.local:9000",
access_key="minioadmin",
secret_key="minioadmin",
secure=True
)
bucket_names = ["bronze", "silver", "gold"]
for bucket_name in bucket_names:
try:
# Listar objetos no bucket e remover
for obj in minio_client.list_objects(bucket_name):
minio_client.remove_object(bucket_name, obj.object_name)
print(f"Objeto {obj.object_name} removido com sucesso!")
except ResponseError as err:
print(f"Erro ao limpar o bucket: {err}")
@dag() @dag()
def process_records(): def process_records():
""" """
...@@ -33,7 +55,12 @@ def process_records(): ...@@ -33,7 +55,12 @@ def process_records():
bash_command="dbt deps && dbt build", bash_command="dbt deps && dbt build",
) )
task_1 >> task_2 >> task_3 task_4 = PythonOperator(
task_id="clean_buckets",
python_callable=clean_buckets
)
task_1 >> task_2 >> task_3 >> task_4
dag = process_records() dag = process_records()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment