From 32898e6f06e1f770eb7793c9aa8f9a8540bfb9f7 Mon Sep 17 00:00:00 2001
From: edvs19 <edvs19@inf.ufpr.br>
Date: Thu, 6 Mar 2025 14:45:15 -0300
Subject: [PATCH] teste

---
 gen_bronze.py | 146 ++++++++++++++++++++++++++++++++++++++++++++++++++
 teste_dag.py  |  84 ++++++++++++-----------------
 2 files changed, 181 insertions(+), 49 deletions(-)

diff --git a/gen_bronze.py b/gen_bronze.py
index e69de29..62ed0f8 100644
--- a/gen_bronze.py
+++ b/gen_bronze.py
@@ -0,0 +1,146 @@
+import sys
+from pyspark.sql import SparkSession
+from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, DateType, ArrayType
+
+# Configuração do Spark para acessar o MinIO
+print("Iniciando a configuração do Spark...")
+spark = SparkSession.builder \
+    .appName("Landig to Bronze") \
+    .config("spark.hadoop.fs.s3a.endpoint", "http://minio.minio-cluster.svc.cluster.local:9000") \
+    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
+    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
+    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
+    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
+    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
+    .config("spark.sql.warehouse.dir", "s3a://landing/warehouse") \
+    .getOrCreate()
+
+# Definição completa do esquema
+print("Definindo o esquema dos dados...")
+schema = StructType([
+    StructField("profissionalCNS", LongType(), True),
+    StructField("cboCodigo_2002", LongType(), True),
+    StructField("cnes", LongType(), True),
+    StructField("ine", LongType(), True),
+    StructField("dataAtendimento", DateType(), True),
+    StructField("condicoesDeSaude", StructType([
+        StructField("descricaoCausaInternacaoEm12Meses", StringType(), True),
+        StructField("descricaoOutraCondicao1", StringType(), True),
+        StructField("descricaoOutraCondicao2", StringType(), True),
+        StructField("descricaoOutraCondicao3", StringType(), True),
+        StructField("descricaoPlantasMedicinaisUsadas", ArrayType(StringType()), True),
+        StructField("doencaRespiratoria", ArrayType(StringType()), True),
+        StructField("doencaRins", ArrayType(StringType()), True),
+        StructField("maternidadeDeReferencia", StringType(), True),
+        StructField("situacaoPeso", StringType(), True),
+        StructField("statusEhDependenteAlcool", BooleanType(), True),
+        StructField("statusEhDependenteOutrasDrogas", BooleanType(), True),
+        StructField("statusEhFumante", BooleanType(), True),
+        StructField("statusEhGestante", BooleanType(), True),
+        StructField("statusEstaAcamado", BooleanType(), True),
+        StructField("statusEstaDomiciliado", BooleanType(), True),
+        StructField("statusTemDiabetes", BooleanType(), True),
+        StructField("statusTemDoencaRespiratoria", BooleanType(), True),
+        StructField("statusTemHanseniase", BooleanType(), True),
+        StructField("statusTemHipertensaoArterial", BooleanType(), True),
+        StructField("statusTemTeveCancer", BooleanType(), True),
+        StructField("statusTemTeveDoencasRins", BooleanType(), True),
+        StructField("statusTemTuberculose", BooleanType(), True),
+        StructField("statusTeveAvcDerrame", BooleanType(), True),
+        StructField("statusTeveDoencaCardiaca", BooleanType(), True),
+        StructField("statusTeveInfarto", BooleanType(), True),
+        StructField("statusTeveInternadoem12Meses", BooleanType(), True),
+        StructField("statusUsaOutrasPraticasIntegrativasOuComplementares", BooleanType(), True),
+        StructField("statusUsaPlantasMedicinais", BooleanType(), True),
+        StructField("statusDiagnosticoMental", StringType(), True)
+    ]), True),
+    StructField("emSituacaoDeRua", StructType([
+        StructField("grauParentescoFamiliarFrequentado", StringType(), True),
+        StructField("higienePessoalSituacaoRua", ArrayType(StringType()), True),
+        StructField("origemAlimentoSituacaoRua", ArrayType(StringType()), True),
+        StructField("outraInstituicaoQueAcompanha", StringType(), True),
+        StructField("quantidadeAlimentacoesAoDiaSituacaoRua", StringType(), True),
+        StructField("statusAcompanhadoPorOutraInstituicao", BooleanType(), True),
+        StructField("statusPossuiReferenciaFamiliar", BooleanType(), True),
+        StructField("statusRecebeBeneficio", BooleanType(), True),
+        StructField("statusSituacaoRua", BooleanType(), True),
+        StructField("statusTemAcessoHigienePessoalSituacaoRua", BooleanType(), True),
+        StructField("statusVisitaFamiliarFrequentemente", BooleanType(), True),
+        StructField("tempoSituacaoRua", StringType(), True)
+    ]), True),
+    StructField("identificacaoUsuarioCidadao", StructType([
+        StructField("nomeSocial", StringType(), True),
+        StructField("município", StringType(), True),
+        StructField("dataNascimentoCidadao", DateType(), True),
+        StructField("emailCidadao", StringType(), True),
+        StructField("nacionalidadeCidadao", StringType(), True),
+        StructField("nomeCidadao", StringType(), True),
+        StructField("nomeMaeCidadao", StringType(), True),
+        StructField("cnsCidadao", LongType(), True),
+        StructField("cnsResponsavelFamiliar", LongType(), True),
+        StructField("telefoneCelular", StringType(), True),
+        StructField("numeroNisPisPasep", LongType(), True),
+        StructField("paisNascimento", StringType(), True),
+        StructField("racaCorCidadao", StringType(), True),
+        StructField("sexoCidadao", StringType(), True),
+        StructField("statusEhResponsavel", BooleanType(), True),
+        StructField("etnia", StringType(), True),
+        StructField("nomePaiCidadao", StringType(), True),
+        StructField("desconheceNomePai", BooleanType(), True),
+        StructField("dtNaturalizacao", DateType(), True),
+        StructField("portariaNaturalizacao", StringType(), True),
+        StructField("dtEntradaBrasil", DateType(), True),
+        StructField("microarea", LongType(), True),
+        StructField("stForaArea", BooleanType(), True),
+        StructField("cpfCidadao", StringType(), True),
+        StructField("cpfResponsavelFamiliar", StringType(), True)
+    ]), True),
+    StructField("InformacoesSocioDemograficas", StructType([
+        StructField("deficienciasCidadao", ArrayType(StringType()), True),
+        StructField("grauInstrucaoCidadao", StringType(), True),
+        StructField("ocupacao", StringType(), True),
+        StructField("orientacaoSexualCidadao", StringType(), True),
+        StructField("relacaoParentescoCidadao", StringType(), True),
+        StructField("situacaoMercadoTrabalhoCidadao", StringType(), True),
+        StructField("statusDesejaInformarOrientacaoSexual", BooleanType(), True),
+        StructField("statusFrequentaBenzedeira", BooleanType(), True),
+        StructField("statusFrequentaEscola", BooleanType(), True),
+        StructField("statusMembroPovoComunidadeTradicional", BooleanType(), True),
+        StructField("statusParticipaGrupoComunitario", BooleanType(), True),
+        StructField("statusPossuiPlanoSaudePrivado", BooleanType(), True),
+        StructField("statusTemAlgumaDeficiencia", BooleanType(), True),
+        StructField("identidadeGeneroCidadao", StringType(), True),
+        StructField("statusDesejaInformarIdentidadeGenero", BooleanType(), True),
+        StructField("responsavelPorCrianca", StringType(), True),
+        StructField("coPovoComunidadeTradicional", StringType(), True)
+    ]), True),
+    StructField("statusTermoRecusaCadastroIndividualAtencaoBasica", BooleanType(), True),
+    StructField("saidaCidadaoCadastro", StructType([
+        StructField("motivoSaidaCidadao", StringType(), True),
+        StructField("dataObito", DateType(), True),
+        StructField("numeroDO", StringType(), True)
+    ]), True)
+])
+
+
+print("Esquema definido com sucesso.")
+# Carregar JSON com o esquema
+input_path = "s3a://landing/" + sys.args[1]
+output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet"
+try:
+    print("Carregando dados do JSON...")
+    df = spark.read.schema(schema).json(input_path)
+    print("Dados carregados com sucesso.")
+except Exception as e:
+    print(f"Erro ao carregar JSON: {e}")
+    spark.stop()
+    exit(1)
+
+try:
+    print("Gravando dados em formato Parquet...")
+    df.write.mode("overwrite").parquet(output_path)
+    print("Dados gravados com sucesso.")
+except Exception as e:
+    print(f"Erro ao gravar Parquet: {e}")
+finally:
+    spark.stop()
diff --git a/teste_dag.py b/teste_dag.py
index 809e40f..caa9438 100644
--- a/teste_dag.py
+++ b/teste_dag.py
@@ -4,58 +4,44 @@ from airflow import DAG
 from airflow.decorators import dag, task
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.bash import BashOperator
-from airflow.operators.trigger_dagrun import TriggerDagRunOperator
-from airflow.models.param import Param
 
 from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
 
+from airflow.models import Variable
+
+
 
 repo_home = '/opt/airflow/dags/repo'
 
-# @dag(
-#     params={
-#       "input_file" : Param(
-#                       "nofile.jsonl",
-#                       "jsonl to be used as input in landing to bronze",
-#                       str
-#                     )
-#     }
-# )
-# def process_records(params: dict):
-#   """
-#   DAG que realiza o fluxo landing -> bronze -> silver -> gold
-#   """
-
-#   task_1 = SparkSubmitOperator(
-#     application=f"{repo_home}/gen_bronze.py",
-#     task_id="landing_to_bronze",
-#     packages = "org.apache.hadoop:hadoop-aws:3.3.4"
-#   )
-
-#   task_2 = BashOperator(
-#     task_id="dbt_setup",
-#     bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
-#   )
-  
-#   task_3 = BashOperator(
-#     task_id="bronze_to_silver_to_gold",
-#     cwd="/tmp/proj_teste",
-#     bash_command="dbt deps && dbt build",
-#   )
-
-#   task_1 >> task_2 >> task_3
-
-
-@dag
-def teste():
-  print("Hello world")
-
-  task = EmptyOperator(
-    task_id="useless"
-  )
-
-  task
-
-
-dag2 = teste()
-# dag3 = run_tests()
+@dag()
+def process_records():
+    """
+    DAG que realiza o fluxo landing -> bronze -> silver -> gold
+    """
+    # SparkSubmitOperator
+    task_spark = SparkSubmitOperator(
+        task_id=f"landing_to_bronze",
+        application=f"{repo_home}/gen_bronze.py",
+        application_args=[
+            Variable.get("input_file")
+        ],
+        packages="org.apache.hadoop:hadoop-aws:3.3.4"
+    )
+
+    # BashOperator 1 - dbt_setup
+    task_dbt_setup = BashOperator(
+        task_id=f"dbt_setup",
+        bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
+    )
+
+    # BashOperator 2 - bronze_to_silver_to_gold
+    task_bronze_to_gold = BashOperator(
+        task_id=f"bronze_to_silver_to_gold",
+        cwd="/tmp/proj_teste",
+        bash_command="dbt deps && dbt build",
+    )
+
+
+    task_spark >> task_dbt_setup >> task_bronze_to_gold
+
+dag = process_records()
-- 
GitLab