From 6a4629a60a457657a4aabeec29857a70c34ba6e2 Mon Sep 17 00:00:00 2001 From: lcd22 <lcd22@inf.ufpr.br> Date: Thu, 6 Mar 2025 12:32:03 +0000 Subject: [PATCH] testanto limpeza com spark --- clean.py | 38 ++++++++++++++++++++++++++++++++++++++ dags.py | 7 ++++++- 2 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 clean.py diff --git a/clean.py b/clean.py new file mode 100644 index 0000000..b525233 --- /dev/null +++ b/clean.py @@ -0,0 +1,38 @@ +from pyspark.sql import SparkSession + +# Configuração do Spark para acessar o MinIO +print("Iniciando a configuração do Spark...") +spark = SparkSession.builder \ + .appName("clean_directories") \ + .config("spark.hadoop.fs.s3a.endpoint", "http://minio.minio-cluster.svc.cluster.local:9000") \ + .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \ + .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \ + .config("spark.hadoop.fs.s3a.path.style.access", "true") \ + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \ + .getOrCreate() + +# Função para limpar os diretórios (buckets) +def clean_bucket(bucket_path): + try: + print(f"Limpando o diretório {bucket_path}...") + spark._jvm.org.apache.hadoop.fs.FileSystem \ + .get(spark._jsc.hadoopConfiguration()) \ + .delete(spark._jvm.org.apache.hadoop.fs.Path(bucket_path), True) + print(f"Diretório {bucket_path} limpo com sucesso.") + except Exception as e: + print(f"Erro ao limpar o diretório {bucket_path}: {e}") + +# Diretórios para limpar +buckets = [ + "s3a://bronze/warehouse", + "s3a://silver/warehouse", + "s3a://gold/warehouse" +] + +# Limpar os buckets +for bucket in buckets: + clean_bucket(bucket) + +# Finalizar o Spark +spark.stop() diff --git a/dags.py b/dags.py index 6b820af..0f441ab 100644 --- a/dags.py +++ b/dags.py @@ -15,6 +15,11 @@ def process_records(): """ DAG que realiza o fluxo landing -> bronze -> silver -> gold """ + task_0 = SparkSubmitOperator( + application=f"{repo_home}/clean.py", + task_id="clean_directories", + packages = "org.apache.hadoop:hadoop-aws:3.3.4" + ) task_1 = SparkSubmitOperator( application=f"{repo_home}/gen_bronze.py", @@ -33,7 +38,7 @@ def process_records(): bash_command="dbt deps && dbt build", ) - task_1 >> task_2 >> task_3 + task_0 >> task_1 >> task_2 >> task_3 dag = process_records() -- GitLab