diff --git a/clean.py b/clean.py deleted file mode 100644 index cc7ad37c6e3b1caa84a435723b7b47269898e663..0000000000000000000000000000000000000000 --- a/clean.py +++ /dev/null @@ -1,52 +0,0 @@ -from pyspark.sql import SparkSession - -# Configuração do Spark para acessar o MinIO -print("Iniciando a configuração do Spark...") -spark = SparkSession.builder \ - .appName("clean_directories") \ - .config("spark.hadoop.fs.s3a.endpoint", "http://minio.minio-cluster.svc.cluster.local:9000") \ - .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \ - .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \ - .config("spark.hadoop.fs.s3a.path.style.access", "true") \ - .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ - .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \ - .config("spark.hadoop.fs.defaultFS", "s3a://") \ # Garantir que o fs padrão seja o s3a - .getOrCreate() - -# Função para limpar os diretórios (buckets) -def clean_bucket(bucket_path): - try: - print(f"Limpando o diretório {bucket_path}...") - - # Certifique-se de que o caminho S3 seja tratado corretamente - if not bucket_path.startswith("s3a://"): - print(f"Caminho inválido: {bucket_path}. Deve começar com 's3a://'.") - return - - # Usando a API Hadoop FileSystem para deletar o diretório S3 - fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()) - path = spark._jvm.org.apache.hadoop.fs.Path(bucket_path) - - # Apagar o diretório S3 (o parâmetro 'True' remove recursivamente) - if fs.exists(path): - fs.delete(path, True) - print(f"Diretório {bucket_path} limpo com sucesso.") - else: - print(f"O diretório {bucket_path} não existe.") - - except Exception as e: - print(f"Erro ao limpar o diretório {bucket_path}: {e}") - -# Diretórios para limpar -buckets = [ - "s3a://bronze/warehouse", - "s3a://silver/warehouse", - "s3a://gold/warehouse" -] - -# Limpar os buckets -for bucket in buckets: - clean_bucket(bucket) - -# Finalizar o Spark -spark.stop() diff --git a/dags.py b/dags.py index 0f441aba2473946fbf87676e5a3da7408a0e1ba3..6b820afaf8271f084060a620a24cea060b6ebcae 100644 --- a/dags.py +++ b/dags.py @@ -15,11 +15,6 @@ def process_records(): """ DAG que realiza o fluxo landing -> bronze -> silver -> gold """ - task_0 = SparkSubmitOperator( - application=f"{repo_home}/clean.py", - task_id="clean_directories", - packages = "org.apache.hadoop:hadoop-aws:3.3.4" - ) task_1 = SparkSubmitOperator( application=f"{repo_home}/gen_bronze.py", @@ -38,7 +33,7 @@ def process_records(): bash_command="dbt deps && dbt build", ) - task_0 >> task_1 >> task_2 >> task_3 + task_1 >> task_2 >> task_3 dag = process_records()