From 0173220e65edfab95412a77da7f0968c4d870ff6 Mon Sep 17 00:00:00 2001 From: lcd22 <lcd22@inf.ufpr.br> Date: Thu, 6 Mar 2025 01:02:40 +0000 Subject: [PATCH] add clean buckets --- dags.py | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/dags.py b/dags.py index 6b820af..b37fe9d 100644 --- a/dags.py +++ b/dags.py @@ -10,6 +10,28 @@ from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOpe repo_home = '/opt/airflow/dags/repo' +# Função para limpar o buckets do Minio +def clean_buckets(): + # Configurar cliente do Minio + minio_client = Minio( + "http://minio.minio-cluster.svc.cluster.local:9000", + access_key="minioadmin", + secret_key="minioadmin", + secure=True + ) + + bucket_names = ["bronze", "silver", "gold"] + + for bucket_name in bucket_names: + try: + # Listar objetos no bucket e remover + for obj in minio_client.list_objects(bucket_name): + minio_client.remove_object(bucket_name, obj.object_name) + print(f"Objeto {obj.object_name} removido com sucesso!") + except ResponseError as err: + print(f"Erro ao limpar o bucket: {err}") + + @dag() def process_records(): """ @@ -33,7 +55,12 @@ def process_records(): bash_command="dbt deps && dbt build", ) - task_1 >> task_2 >> task_3 + task_4 = PythonOperator( + task_id="clean_buckets", + python_callable=clean_buckets + ) + + task_1 >> task_2 >> task_3 >> task_4 dag = process_records() -- GitLab