Skip to content
Snippets Groups Projects
Commit 32898e6f authored by edvs19's avatar edvs19
Browse files

teste

parent d65063f3
No related branches found
No related tags found
No related merge requests found
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, DateType, ArrayType
# Configuração do Spark para acessar o MinIO
print("Iniciando a configuração do Spark...")
spark = SparkSession.builder \
.appName("Landig to Bronze") \
.config("spark.hadoop.fs.s3a.endpoint", "http://minio.minio-cluster.svc.cluster.local:9000") \
.config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
.config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
.config("spark.sql.warehouse.dir", "s3a://landing/warehouse") \
.getOrCreate()
# Definição completa do esquema
print("Definindo o esquema dos dados...")
schema = StructType([
StructField("profissionalCNS", LongType(), True),
StructField("cboCodigo_2002", LongType(), True),
StructField("cnes", LongType(), True),
StructField("ine", LongType(), True),
StructField("dataAtendimento", DateType(), True),
StructField("condicoesDeSaude", StructType([
StructField("descricaoCausaInternacaoEm12Meses", StringType(), True),
StructField("descricaoOutraCondicao1", StringType(), True),
StructField("descricaoOutraCondicao2", StringType(), True),
StructField("descricaoOutraCondicao3", StringType(), True),
StructField("descricaoPlantasMedicinaisUsadas", ArrayType(StringType()), True),
StructField("doencaRespiratoria", ArrayType(StringType()), True),
StructField("doencaRins", ArrayType(StringType()), True),
StructField("maternidadeDeReferencia", StringType(), True),
StructField("situacaoPeso", StringType(), True),
StructField("statusEhDependenteAlcool", BooleanType(), True),
StructField("statusEhDependenteOutrasDrogas", BooleanType(), True),
StructField("statusEhFumante", BooleanType(), True),
StructField("statusEhGestante", BooleanType(), True),
StructField("statusEstaAcamado", BooleanType(), True),
StructField("statusEstaDomiciliado", BooleanType(), True),
StructField("statusTemDiabetes", BooleanType(), True),
StructField("statusTemDoencaRespiratoria", BooleanType(), True),
StructField("statusTemHanseniase", BooleanType(), True),
StructField("statusTemHipertensaoArterial", BooleanType(), True),
StructField("statusTemTeveCancer", BooleanType(), True),
StructField("statusTemTeveDoencasRins", BooleanType(), True),
StructField("statusTemTuberculose", BooleanType(), True),
StructField("statusTeveAvcDerrame", BooleanType(), True),
StructField("statusTeveDoencaCardiaca", BooleanType(), True),
StructField("statusTeveInfarto", BooleanType(), True),
StructField("statusTeveInternadoem12Meses", BooleanType(), True),
StructField("statusUsaOutrasPraticasIntegrativasOuComplementares", BooleanType(), True),
StructField("statusUsaPlantasMedicinais", BooleanType(), True),
StructField("statusDiagnosticoMental", StringType(), True)
]), True),
StructField("emSituacaoDeRua", StructType([
StructField("grauParentescoFamiliarFrequentado", StringType(), True),
StructField("higienePessoalSituacaoRua", ArrayType(StringType()), True),
StructField("origemAlimentoSituacaoRua", ArrayType(StringType()), True),
StructField("outraInstituicaoQueAcompanha", StringType(), True),
StructField("quantidadeAlimentacoesAoDiaSituacaoRua", StringType(), True),
StructField("statusAcompanhadoPorOutraInstituicao", BooleanType(), True),
StructField("statusPossuiReferenciaFamiliar", BooleanType(), True),
StructField("statusRecebeBeneficio", BooleanType(), True),
StructField("statusSituacaoRua", BooleanType(), True),
StructField("statusTemAcessoHigienePessoalSituacaoRua", BooleanType(), True),
StructField("statusVisitaFamiliarFrequentemente", BooleanType(), True),
StructField("tempoSituacaoRua", StringType(), True)
]), True),
StructField("identificacaoUsuarioCidadao", StructType([
StructField("nomeSocial", StringType(), True),
StructField("município", StringType(), True),
StructField("dataNascimentoCidadao", DateType(), True),
StructField("emailCidadao", StringType(), True),
StructField("nacionalidadeCidadao", StringType(), True),
StructField("nomeCidadao", StringType(), True),
StructField("nomeMaeCidadao", StringType(), True),
StructField("cnsCidadao", LongType(), True),
StructField("cnsResponsavelFamiliar", LongType(), True),
StructField("telefoneCelular", StringType(), True),
StructField("numeroNisPisPasep", LongType(), True),
StructField("paisNascimento", StringType(), True),
StructField("racaCorCidadao", StringType(), True),
StructField("sexoCidadao", StringType(), True),
StructField("statusEhResponsavel", BooleanType(), True),
StructField("etnia", StringType(), True),
StructField("nomePaiCidadao", StringType(), True),
StructField("desconheceNomePai", BooleanType(), True),
StructField("dtNaturalizacao", DateType(), True),
StructField("portariaNaturalizacao", StringType(), True),
StructField("dtEntradaBrasil", DateType(), True),
StructField("microarea", LongType(), True),
StructField("stForaArea", BooleanType(), True),
StructField("cpfCidadao", StringType(), True),
StructField("cpfResponsavelFamiliar", StringType(), True)
]), True),
StructField("InformacoesSocioDemograficas", StructType([
StructField("deficienciasCidadao", ArrayType(StringType()), True),
StructField("grauInstrucaoCidadao", StringType(), True),
StructField("ocupacao", StringType(), True),
StructField("orientacaoSexualCidadao", StringType(), True),
StructField("relacaoParentescoCidadao", StringType(), True),
StructField("situacaoMercadoTrabalhoCidadao", StringType(), True),
StructField("statusDesejaInformarOrientacaoSexual", BooleanType(), True),
StructField("statusFrequentaBenzedeira", BooleanType(), True),
StructField("statusFrequentaEscola", BooleanType(), True),
StructField("statusMembroPovoComunidadeTradicional", BooleanType(), True),
StructField("statusParticipaGrupoComunitario", BooleanType(), True),
StructField("statusPossuiPlanoSaudePrivado", BooleanType(), True),
StructField("statusTemAlgumaDeficiencia", BooleanType(), True),
StructField("identidadeGeneroCidadao", StringType(), True),
StructField("statusDesejaInformarIdentidadeGenero", BooleanType(), True),
StructField("responsavelPorCrianca", StringType(), True),
StructField("coPovoComunidadeTradicional", StringType(), True)
]), True),
StructField("statusTermoRecusaCadastroIndividualAtencaoBasica", BooleanType(), True),
StructField("saidaCidadaoCadastro", StructType([
StructField("motivoSaidaCidadao", StringType(), True),
StructField("dataObito", DateType(), True),
StructField("numeroDO", StringType(), True)
]), True)
])
print("Esquema definido com sucesso.")
# Carregar JSON com o esquema
input_path = "s3a://landing/" + sys.args[1]
output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet"
try:
print("Carregando dados do JSON...")
df = spark.read.schema(schema).json(input_path)
print("Dados carregados com sucesso.")
except Exception as e:
print(f"Erro ao carregar JSON: {e}")
spark.stop()
exit(1)
try:
print("Gravando dados em formato Parquet...")
df.write.mode("overwrite").parquet(output_path)
print("Dados gravados com sucesso.")
except Exception as e:
print(f"Erro ao gravar Parquet: {e}")
finally:
spark.stop()
...@@ -4,58 +4,44 @@ from airflow import DAG ...@@ -4,58 +4,44 @@ from airflow import DAG
from airflow.decorators import dag, task from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.models.param import Param
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.models import Variable
repo_home = '/opt/airflow/dags/repo' repo_home = '/opt/airflow/dags/repo'
# @dag( @dag()
# params={ def process_records():
# "input_file" : Param( """
# "nofile.jsonl", DAG que realiza o fluxo landing -> bronze -> silver -> gold
# "jsonl to be used as input in landing to bronze", """
# str # SparkSubmitOperator
# ) task_spark = SparkSubmitOperator(
# } task_id=f"landing_to_bronze",
# ) application=f"{repo_home}/gen_bronze.py",
# def process_records(params: dict): application_args=[
# """ Variable.get("input_file")
# DAG que realiza o fluxo landing -> bronze -> silver -> gold ],
# """ packages="org.apache.hadoop:hadoop-aws:3.3.4"
)
# task_1 = SparkSubmitOperator(
# application=f"{repo_home}/gen_bronze.py", # BashOperator 1 - dbt_setup
# task_id="landing_to_bronze", task_dbt_setup = BashOperator(
# packages = "org.apache.hadoop:hadoop-aws:3.3.4" task_id=f"dbt_setup",
# ) bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
)
# task_2 = BashOperator(
# task_id="dbt_setup", # BashOperator 2 - bronze_to_silver_to_gold
# bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", task_bronze_to_gold = BashOperator(
# ) task_id=f"bronze_to_silver_to_gold",
cwd="/tmp/proj_teste",
# task_3 = BashOperator( bash_command="dbt deps && dbt build",
# task_id="bronze_to_silver_to_gold", )
# cwd="/tmp/proj_teste",
# bash_command="dbt deps && dbt build",
# ) task_spark >> task_dbt_setup >> task_bronze_to_gold
# task_1 >> task_2 >> task_3 dag = process_records()
@dag
def teste():
print("Hello world")
task = EmptyOperator(
task_id="useless"
)
task
dag2 = teste()
# dag3 = run_tests()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment