Skip to content
Snippets Groups Projects
Commit 0ec0f117 authored by lcd22's avatar lcd22
Browse files

volta versão

parent 7c91518f
Branches
No related tags found
No related merge requests found
from pyspark.sql import SparkSession
# Configuração do Spark para acessar o MinIO
print("Iniciando a configuração do Spark...")
spark = SparkSession.builder \
.appName("clean_directories") \
.config("spark.hadoop.fs.s3a.endpoint", "http://minio.minio-cluster.svc.cluster.local:9000") \
.config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
.config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
.config("spark.hadoop.fs.defaultFS", "s3a://") \ # Garantir que o fs padrão seja o s3a
.getOrCreate()
# Função para limpar os diretórios (buckets)
def clean_bucket(bucket_path):
try:
print(f"Limpando o diretório {bucket_path}...")
# Certifique-se de que o caminho S3 seja tratado corretamente
if not bucket_path.startswith("s3a://"):
print(f"Caminho inválido: {bucket_path}. Deve começar com 's3a://'.")
return
# Usando a API Hadoop FileSystem para deletar o diretório S3
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
path = spark._jvm.org.apache.hadoop.fs.Path(bucket_path)
# Apagar o diretório S3 (o parâmetro 'True' remove recursivamente)
if fs.exists(path):
fs.delete(path, True)
print(f"Diretório {bucket_path} limpo com sucesso.")
else:
print(f"O diretório {bucket_path} não existe.")
except Exception as e:
print(f"Erro ao limpar o diretório {bucket_path}: {e}")
# Diretórios para limpar
buckets = [
"s3a://bronze/warehouse",
"s3a://silver/warehouse",
"s3a://gold/warehouse"
]
# Limpar os buckets
for bucket in buckets:
clean_bucket(bucket)
# Finalizar o Spark
spark.stop()
...@@ -15,11 +15,6 @@ def process_records(): ...@@ -15,11 +15,6 @@ def process_records():
""" """
DAG que realiza o fluxo landing -> bronze -> silver -> gold DAG que realiza o fluxo landing -> bronze -> silver -> gold
""" """
task_0 = SparkSubmitOperator(
application=f"{repo_home}/clean.py",
task_id="clean_directories",
packages = "org.apache.hadoop:hadoop-aws:3.3.4"
)
task_1 = SparkSubmitOperator( task_1 = SparkSubmitOperator(
application=f"{repo_home}/gen_bronze.py", application=f"{repo_home}/gen_bronze.py",
...@@ -38,7 +33,7 @@ def process_records(): ...@@ -38,7 +33,7 @@ def process_records():
bash_command="dbt deps && dbt build", bash_command="dbt deps && dbt build",
) )
task_0 >> task_1 >> task_2 >> task_3 task_1 >> task_2 >> task_3
dag = process_records() dag = process_records()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment