From 9ea0555fc53b46a7ff4587a784280909c15b1c3d Mon Sep 17 00:00:00 2001 From: lcd22 <lcd22@inf.ufpr.br> Date: Thu, 6 Mar 2025 14:50:52 +0000 Subject: [PATCH] =?UTF-8?q?v=C3=A1rios=20grupo=20de=20fichas=20de=20uma=20?= =?UTF-8?q?vez?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dags.py | 43 +++++++-- gen_bronze_10000.py | 145 ++++++++++++++++++++++++++++ gen_bronze_100000.py | 145 ++++++++++++++++++++++++++++ gen_bronze_20000.py | 145 ++++++++++++++++++++++++++++ gen_bronze_200000.py | 145 ++++++++++++++++++++++++++++ gen_bronze.py => gen_bronze_5000.py | 2 +- gen_bronze_50000.py | 145 ++++++++++++++++++++++++++++ 7 files changed, 762 insertions(+), 8 deletions(-) create mode 100644 gen_bronze_10000.py create mode 100644 gen_bronze_100000.py create mode 100644 gen_bronze_20000.py create mode 100644 gen_bronze_200000.py rename gen_bronze.py => gen_bronze_5000.py (99%) create mode 100644 gen_bronze_50000.py diff --git a/dags.py b/dags.py index 6b820af..bf437ef 100644 --- a/dags.py +++ b/dags.py @@ -16,12 +16,6 @@ def process_records(): DAG que realiza o fluxo landing -> bronze -> silver -> gold """ - task_1 = SparkSubmitOperator( - application=f"{repo_home}/gen_bronze.py", - task_id="landing_to_bronze", - packages = "org.apache.hadoop:hadoop-aws:3.3.4" - ) - task_2 = BashOperator( task_id="dbt_setup", bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", @@ -33,7 +27,42 @@ def process_records(): bash_command="dbt deps && dbt build", ) - task_1 >> task_2 >> task_3 + task_4 = SparkSubmitOperator( + application=f"{repo_home}/gen_bronze_5000.py", + task_id="landing_to_bronze_5000", + packages = "org.apache.hadoop:hadoop-aws:3.3.4" + ) + + task_5 = SparkSubmitOperator( + application=f"{repo_home}/gen_bronze_10000.py", + task_id="landing_to_bronze_10000", + packages = "org.apache.hadoop:hadoop-aws:3.3.4" + ) + + task_6 = SparkSubmitOperator( + application=f"{repo_home}/gen_bronze_20000.py", + task_id="landing_to_bronze_20000", + packages = "org.apache.hadoop:hadoop-aws:3.3.4" + ) + + task_7 = SparkSubmitOperator( + application=f"{repo_home}/gen_bronze_50000.py", + task_id="landing_to_bronze_50000", + packages = "org.apache.hadoop:hadoop-aws:3.3.4" + ) + + task_8 = SparkSubmitOperator( + application=f"{repo_home}/gen_bronze_100000.py", + task_id="landing_to_bronze_100000", + packages = "org.apache.hadoop:hadoop-aws:3.3.4" + ) + + task_9 = SparkSubmitOperator( + application=f"{repo_home}/gen_bronze_200000.py", + task_id="landing_to_bronze_200000", + packages = "org.apache.hadoop:hadoop-aws:3.3.4" + ) + task_4 >> task_2 >> task_3 >> task_5 >> task_2 >> task_3 >> task_6 >> task_2 >> task_3 >> task_7 >> task_2 >> task_3 >> task_8 >> task_2 >> task_3 >> task_9 >> task_2 >> task_3 dag = process_records() diff --git a/gen_bronze_10000.py b/gen_bronze_10000.py new file mode 100644 index 0000000..7f6d0b7 --- /dev/null +++ b/gen_bronze_10000.py @@ -0,0 +1,145 @@ +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, DateType, ArrayType + +# Configuração do Spark para acessar o MinIO +print("Iniciando a configuração do Spark...") +spark = SparkSession.builder \ + .appName("Landig to Bronze") \ + .config("spark.hadoop.fs.s3a.endpoint", "http://minio.minio-cluster.svc.cluster.local:9000") \ + .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \ + .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \ + .config("spark.hadoop.fs.s3a.path.style.access", "true") \ + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \ + .config("spark.sql.warehouse.dir", "s3a://landing/warehouse") \ + .getOrCreate() + +# Definição completa do esquema +print("Definindo o esquema dos dados...") +schema = StructType([ + StructField("profissionalCNS", LongType(), True), + StructField("cboCodigo_2002", LongType(), True), + StructField("cnes", LongType(), True), + StructField("ine", LongType(), True), + StructField("dataAtendimento", DateType(), True), + StructField("condicoesDeSaude", StructType([ + StructField("descricaoCausaInternacaoEm12Meses", StringType(), True), + StructField("descricaoOutraCondicao1", StringType(), True), + StructField("descricaoOutraCondicao2", StringType(), True), + StructField("descricaoOutraCondicao3", StringType(), True), + StructField("descricaoPlantasMedicinaisUsadas", ArrayType(StringType()), True), + StructField("doencaRespiratoria", ArrayType(StringType()), True), + StructField("doencaRins", ArrayType(StringType()), True), + StructField("maternidadeDeReferencia", StringType(), True), + StructField("situacaoPeso", StringType(), True), + StructField("statusEhDependenteAlcool", BooleanType(), True), + StructField("statusEhDependenteOutrasDrogas", BooleanType(), True), + StructField("statusEhFumante", BooleanType(), True), + StructField("statusEhGestante", BooleanType(), True), + StructField("statusEstaAcamado", BooleanType(), True), + StructField("statusEstaDomiciliado", BooleanType(), True), + StructField("statusTemDiabetes", BooleanType(), True), + StructField("statusTemDoencaRespiratoria", BooleanType(), True), + StructField("statusTemHanseniase", BooleanType(), True), + StructField("statusTemHipertensaoArterial", BooleanType(), True), + StructField("statusTemTeveCancer", BooleanType(), True), + StructField("statusTemTeveDoencasRins", BooleanType(), True), + StructField("statusTemTuberculose", BooleanType(), True), + StructField("statusTeveAvcDerrame", BooleanType(), True), + StructField("statusTeveDoencaCardiaca", BooleanType(), True), + StructField("statusTeveInfarto", BooleanType(), True), + StructField("statusTeveInternadoem12Meses", BooleanType(), True), + StructField("statusUsaOutrasPraticasIntegrativasOuComplementares", BooleanType(), True), + StructField("statusUsaPlantasMedicinais", BooleanType(), True), + StructField("statusDiagnosticoMental", StringType(), True) + ]), True), + StructField("emSituacaoDeRua", StructType([ + StructField("grauParentescoFamiliarFrequentado", StringType(), True), + StructField("higienePessoalSituacaoRua", ArrayType(StringType()), True), + StructField("origemAlimentoSituacaoRua", ArrayType(StringType()), True), + StructField("outraInstituicaoQueAcompanha", StringType(), True), + StructField("quantidadeAlimentacoesAoDiaSituacaoRua", StringType(), True), + StructField("statusAcompanhadoPorOutraInstituicao", BooleanType(), True), + StructField("statusPossuiReferenciaFamiliar", BooleanType(), True), + StructField("statusRecebeBeneficio", BooleanType(), True), + StructField("statusSituacaoRua", BooleanType(), True), + StructField("statusTemAcessoHigienePessoalSituacaoRua", BooleanType(), True), + StructField("statusVisitaFamiliarFrequentemente", BooleanType(), True), + StructField("tempoSituacaoRua", StringType(), True) + ]), True), + StructField("identificacaoUsuarioCidadao", StructType([ + StructField("nomeSocial", StringType(), True), + StructField("município", StringType(), True), + StructField("dataNascimentoCidadao", DateType(), True), + StructField("emailCidadao", StringType(), True), + StructField("nacionalidadeCidadao", StringType(), True), + StructField("nomeCidadao", StringType(), True), + StructField("nomeMaeCidadao", StringType(), True), + StructField("cnsCidadao", LongType(), True), + StructField("cnsResponsavelFamiliar", LongType(), True), + StructField("telefoneCelular", StringType(), True), + StructField("numeroNisPisPasep", LongType(), True), + StructField("paisNascimento", StringType(), True), + StructField("racaCorCidadao", StringType(), True), + StructField("sexoCidadao", StringType(), True), + StructField("statusEhResponsavel", BooleanType(), True), + StructField("etnia", StringType(), True), + StructField("nomePaiCidadao", StringType(), True), + StructField("desconheceNomePai", BooleanType(), True), + StructField("dtNaturalizacao", DateType(), True), + StructField("portariaNaturalizacao", StringType(), True), + StructField("dtEntradaBrasil", DateType(), True), + StructField("microarea", LongType(), True), + StructField("stForaArea", BooleanType(), True), + StructField("cpfCidadao", StringType(), True), + StructField("cpfResponsavelFamiliar", StringType(), True) + ]), True), + StructField("InformacoesSocioDemograficas", StructType([ + StructField("deficienciasCidadao", ArrayType(StringType()), True), + StructField("grauInstrucaoCidadao", StringType(), True), + StructField("ocupacao", StringType(), True), + StructField("orientacaoSexualCidadao", StringType(), True), + StructField("relacaoParentescoCidadao", StringType(), True), + StructField("situacaoMercadoTrabalhoCidadao", StringType(), True), + StructField("statusDesejaInformarOrientacaoSexual", BooleanType(), True), + StructField("statusFrequentaBenzedeira", BooleanType(), True), + StructField("statusFrequentaEscola", BooleanType(), True), + StructField("statusMembroPovoComunidadeTradicional", BooleanType(), True), + StructField("statusParticipaGrupoComunitario", BooleanType(), True), + StructField("statusPossuiPlanoSaudePrivado", BooleanType(), True), + StructField("statusTemAlgumaDeficiencia", BooleanType(), True), + StructField("identidadeGeneroCidadao", StringType(), True), + StructField("statusDesejaInformarIdentidadeGenero", BooleanType(), True), + StructField("responsavelPorCrianca", StringType(), True), + StructField("coPovoComunidadeTradicional", StringType(), True) + ]), True), + StructField("statusTermoRecusaCadastroIndividualAtencaoBasica", BooleanType(), True), + StructField("saidaCidadaoCadastro", StructType([ + StructField("motivoSaidaCidadao", StringType(), True), + StructField("dataObito", DateType(), True), + StructField("numeroDO", StringType(), True) + ]), True) +]) + + +print("Esquema definido com sucesso.") +# Carregar JSON com o esquema +input_path = "s3a://landing/warehouse/fichas_cadastro_individual_10000.jsonl" +output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet" +try: + print("Carregando dados do JSON...") + df = spark.read.schema(schema).json(input_path) + print("Dados carregados com sucesso.") +except Exception as e: + print(f"Erro ao carregar JSON: {e}") + spark.stop() + exit(1) + +try: + print("Gravando dados em formato Parquet...") + df.write.mode("overwrite").parquet(output_path) + print("Dados gravados com sucesso.") +except Exception as e: + print(f"Erro ao gravar Parquet: {e}") +finally: + spark.stop() diff --git a/gen_bronze_100000.py b/gen_bronze_100000.py new file mode 100644 index 0000000..750ecfa --- /dev/null +++ b/gen_bronze_100000.py @@ -0,0 +1,145 @@ +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, DateType, ArrayType + +# Configuração do Spark para acessar o MinIO +print("Iniciando a configuração do Spark...") +spark = SparkSession.builder \ + .appName("Landig to Bronze") \ + .config("spark.hadoop.fs.s3a.endpoint", "http://minio.minio-cluster.svc.cluster.local:9000") \ + .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \ + .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \ + .config("spark.hadoop.fs.s3a.path.style.access", "true") \ + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \ + .config("spark.sql.warehouse.dir", "s3a://landing/warehouse") \ + .getOrCreate() + +# Definição completa do esquema +print("Definindo o esquema dos dados...") +schema = StructType([ + StructField("profissionalCNS", LongType(), True), + StructField("cboCodigo_2002", LongType(), True), + StructField("cnes", LongType(), True), + StructField("ine", LongType(), True), + StructField("dataAtendimento", DateType(), True), + StructField("condicoesDeSaude", StructType([ + StructField("descricaoCausaInternacaoEm12Meses", StringType(), True), + StructField("descricaoOutraCondicao1", StringType(), True), + StructField("descricaoOutraCondicao2", StringType(), True), + StructField("descricaoOutraCondicao3", StringType(), True), + StructField("descricaoPlantasMedicinaisUsadas", ArrayType(StringType()), True), + StructField("doencaRespiratoria", ArrayType(StringType()), True), + StructField("doencaRins", ArrayType(StringType()), True), + StructField("maternidadeDeReferencia", StringType(), True), + StructField("situacaoPeso", StringType(), True), + StructField("statusEhDependenteAlcool", BooleanType(), True), + StructField("statusEhDependenteOutrasDrogas", BooleanType(), True), + StructField("statusEhFumante", BooleanType(), True), + StructField("statusEhGestante", BooleanType(), True), + StructField("statusEstaAcamado", BooleanType(), True), + StructField("statusEstaDomiciliado", BooleanType(), True), + StructField("statusTemDiabetes", BooleanType(), True), + StructField("statusTemDoencaRespiratoria", BooleanType(), True), + StructField("statusTemHanseniase", BooleanType(), True), + StructField("statusTemHipertensaoArterial", BooleanType(), True), + StructField("statusTemTeveCancer", BooleanType(), True), + StructField("statusTemTeveDoencasRins", BooleanType(), True), + StructField("statusTemTuberculose", BooleanType(), True), + StructField("statusTeveAvcDerrame", BooleanType(), True), + StructField("statusTeveDoencaCardiaca", BooleanType(), True), + StructField("statusTeveInfarto", BooleanType(), True), + StructField("statusTeveInternadoem12Meses", BooleanType(), True), + StructField("statusUsaOutrasPraticasIntegrativasOuComplementares", BooleanType(), True), + StructField("statusUsaPlantasMedicinais", BooleanType(), True), + StructField("statusDiagnosticoMental", StringType(), True) + ]), True), + StructField("emSituacaoDeRua", StructType([ + StructField("grauParentescoFamiliarFrequentado", StringType(), True), + StructField("higienePessoalSituacaoRua", ArrayType(StringType()), True), + StructField("origemAlimentoSituacaoRua", ArrayType(StringType()), True), + StructField("outraInstituicaoQueAcompanha", StringType(), True), + StructField("quantidadeAlimentacoesAoDiaSituacaoRua", StringType(), True), + StructField("statusAcompanhadoPorOutraInstituicao", BooleanType(), True), + StructField("statusPossuiReferenciaFamiliar", BooleanType(), True), + StructField("statusRecebeBeneficio", BooleanType(), True), + StructField("statusSituacaoRua", BooleanType(), True), + StructField("statusTemAcessoHigienePessoalSituacaoRua", BooleanType(), True), + StructField("statusVisitaFamiliarFrequentemente", BooleanType(), True), + StructField("tempoSituacaoRua", StringType(), True) + ]), True), + StructField("identificacaoUsuarioCidadao", StructType([ + StructField("nomeSocial", StringType(), True), + StructField("município", StringType(), True), + StructField("dataNascimentoCidadao", DateType(), True), + StructField("emailCidadao", StringType(), True), + StructField("nacionalidadeCidadao", StringType(), True), + StructField("nomeCidadao", StringType(), True), + StructField("nomeMaeCidadao", StringType(), True), + StructField("cnsCidadao", LongType(), True), + StructField("cnsResponsavelFamiliar", LongType(), True), + StructField("telefoneCelular", StringType(), True), + StructField("numeroNisPisPasep", LongType(), True), + StructField("paisNascimento", StringType(), True), + StructField("racaCorCidadao", StringType(), True), + StructField("sexoCidadao", StringType(), True), + StructField("statusEhResponsavel", BooleanType(), True), + StructField("etnia", StringType(), True), + StructField("nomePaiCidadao", StringType(), True), + StructField("desconheceNomePai", BooleanType(), True), + StructField("dtNaturalizacao", DateType(), True), + StructField("portariaNaturalizacao", StringType(), True), + StructField("dtEntradaBrasil", DateType(), True), + StructField("microarea", LongType(), True), + StructField("stForaArea", BooleanType(), True), + StructField("cpfCidadao", StringType(), True), + StructField("cpfResponsavelFamiliar", StringType(), True) + ]), True), + StructField("InformacoesSocioDemograficas", StructType([ + StructField("deficienciasCidadao", ArrayType(StringType()), True), + StructField("grauInstrucaoCidadao", StringType(), True), + StructField("ocupacao", StringType(), True), + StructField("orientacaoSexualCidadao", StringType(), True), + StructField("relacaoParentescoCidadao", StringType(), True), + StructField("situacaoMercadoTrabalhoCidadao", StringType(), True), + StructField("statusDesejaInformarOrientacaoSexual", BooleanType(), True), + StructField("statusFrequentaBenzedeira", BooleanType(), True), + StructField("statusFrequentaEscola", BooleanType(), True), + StructField("statusMembroPovoComunidadeTradicional", BooleanType(), True), + StructField("statusParticipaGrupoComunitario", BooleanType(), True), + StructField("statusPossuiPlanoSaudePrivado", BooleanType(), True), + StructField("statusTemAlgumaDeficiencia", BooleanType(), True), + StructField("identidadeGeneroCidadao", StringType(), True), + StructField("statusDesejaInformarIdentidadeGenero", BooleanType(), True), + StructField("responsavelPorCrianca", StringType(), True), + StructField("coPovoComunidadeTradicional", StringType(), True) + ]), True), + StructField("statusTermoRecusaCadastroIndividualAtencaoBasica", BooleanType(), True), + StructField("saidaCidadaoCadastro", StructType([ + StructField("motivoSaidaCidadao", StringType(), True), + StructField("dataObito", DateType(), True), + StructField("numeroDO", StringType(), True) + ]), True) +]) + + +print("Esquema definido com sucesso.") +# Carregar JSON com o esquema +input_path = "s3a://landing/warehouse/fichas_cadastro_individual_100000.jsonl" +output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet" +try: + print("Carregando dados do JSON...") + df = spark.read.schema(schema).json(input_path) + print("Dados carregados com sucesso.") +except Exception as e: + print(f"Erro ao carregar JSON: {e}") + spark.stop() + exit(1) + +try: + print("Gravando dados em formato Parquet...") + df.write.mode("overwrite").parquet(output_path) + print("Dados gravados com sucesso.") +except Exception as e: + print(f"Erro ao gravar Parquet: {e}") +finally: + spark.stop() diff --git a/gen_bronze_20000.py b/gen_bronze_20000.py new file mode 100644 index 0000000..49789c4 --- /dev/null +++ b/gen_bronze_20000.py @@ -0,0 +1,145 @@ +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, DateType, ArrayType + +# Configuração do Spark para acessar o MinIO +print("Iniciando a configuração do Spark...") +spark = SparkSession.builder \ + .appName("Landig to Bronze") \ + .config("spark.hadoop.fs.s3a.endpoint", "http://minio.minio-cluster.svc.cluster.local:9000") \ + .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \ + .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \ + .config("spark.hadoop.fs.s3a.path.style.access", "true") \ + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \ + .config("spark.sql.warehouse.dir", "s3a://landing/warehouse") \ + .getOrCreate() + +# Definição completa do esquema +print("Definindo o esquema dos dados...") +schema = StructType([ + StructField("profissionalCNS", LongType(), True), + StructField("cboCodigo_2002", LongType(), True), + StructField("cnes", LongType(), True), + StructField("ine", LongType(), True), + StructField("dataAtendimento", DateType(), True), + StructField("condicoesDeSaude", StructType([ + StructField("descricaoCausaInternacaoEm12Meses", StringType(), True), + StructField("descricaoOutraCondicao1", StringType(), True), + StructField("descricaoOutraCondicao2", StringType(), True), + StructField("descricaoOutraCondicao3", StringType(), True), + StructField("descricaoPlantasMedicinaisUsadas", ArrayType(StringType()), True), + StructField("doencaRespiratoria", ArrayType(StringType()), True), + StructField("doencaRins", ArrayType(StringType()), True), + StructField("maternidadeDeReferencia", StringType(), True), + StructField("situacaoPeso", StringType(), True), + StructField("statusEhDependenteAlcool", BooleanType(), True), + StructField("statusEhDependenteOutrasDrogas", BooleanType(), True), + StructField("statusEhFumante", BooleanType(), True), + StructField("statusEhGestante", BooleanType(), True), + StructField("statusEstaAcamado", BooleanType(), True), + StructField("statusEstaDomiciliado", BooleanType(), True), + StructField("statusTemDiabetes", BooleanType(), True), + StructField("statusTemDoencaRespiratoria", BooleanType(), True), + StructField("statusTemHanseniase", BooleanType(), True), + StructField("statusTemHipertensaoArterial", BooleanType(), True), + StructField("statusTemTeveCancer", BooleanType(), True), + StructField("statusTemTeveDoencasRins", BooleanType(), True), + StructField("statusTemTuberculose", BooleanType(), True), + StructField("statusTeveAvcDerrame", BooleanType(), True), + StructField("statusTeveDoencaCardiaca", BooleanType(), True), + StructField("statusTeveInfarto", BooleanType(), True), + StructField("statusTeveInternadoem12Meses", BooleanType(), True), + StructField("statusUsaOutrasPraticasIntegrativasOuComplementares", BooleanType(), True), + StructField("statusUsaPlantasMedicinais", BooleanType(), True), + StructField("statusDiagnosticoMental", StringType(), True) + ]), True), + StructField("emSituacaoDeRua", StructType([ + StructField("grauParentescoFamiliarFrequentado", StringType(), True), + StructField("higienePessoalSituacaoRua", ArrayType(StringType()), True), + StructField("origemAlimentoSituacaoRua", ArrayType(StringType()), True), + StructField("outraInstituicaoQueAcompanha", StringType(), True), + StructField("quantidadeAlimentacoesAoDiaSituacaoRua", StringType(), True), + StructField("statusAcompanhadoPorOutraInstituicao", BooleanType(), True), + StructField("statusPossuiReferenciaFamiliar", BooleanType(), True), + StructField("statusRecebeBeneficio", BooleanType(), True), + StructField("statusSituacaoRua", BooleanType(), True), + StructField("statusTemAcessoHigienePessoalSituacaoRua", BooleanType(), True), + StructField("statusVisitaFamiliarFrequentemente", BooleanType(), True), + StructField("tempoSituacaoRua", StringType(), True) + ]), True), + StructField("identificacaoUsuarioCidadao", StructType([ + StructField("nomeSocial", StringType(), True), + StructField("município", StringType(), True), + StructField("dataNascimentoCidadao", DateType(), True), + StructField("emailCidadao", StringType(), True), + StructField("nacionalidadeCidadao", StringType(), True), + StructField("nomeCidadao", StringType(), True), + StructField("nomeMaeCidadao", StringType(), True), + StructField("cnsCidadao", LongType(), True), + StructField("cnsResponsavelFamiliar", LongType(), True), + StructField("telefoneCelular", StringType(), True), + StructField("numeroNisPisPasep", LongType(), True), + StructField("paisNascimento", StringType(), True), + StructField("racaCorCidadao", StringType(), True), + StructField("sexoCidadao", StringType(), True), + StructField("statusEhResponsavel", BooleanType(), True), + StructField("etnia", StringType(), True), + StructField("nomePaiCidadao", StringType(), True), + StructField("desconheceNomePai", BooleanType(), True), + StructField("dtNaturalizacao", DateType(), True), + StructField("portariaNaturalizacao", StringType(), True), + StructField("dtEntradaBrasil", DateType(), True), + StructField("microarea", LongType(), True), + StructField("stForaArea", BooleanType(), True), + StructField("cpfCidadao", StringType(), True), + StructField("cpfResponsavelFamiliar", StringType(), True) + ]), True), + StructField("InformacoesSocioDemograficas", StructType([ + StructField("deficienciasCidadao", ArrayType(StringType()), True), + StructField("grauInstrucaoCidadao", StringType(), True), + StructField("ocupacao", StringType(), True), + StructField("orientacaoSexualCidadao", StringType(), True), + StructField("relacaoParentescoCidadao", StringType(), True), + StructField("situacaoMercadoTrabalhoCidadao", StringType(), True), + StructField("statusDesejaInformarOrientacaoSexual", BooleanType(), True), + StructField("statusFrequentaBenzedeira", BooleanType(), True), + StructField("statusFrequentaEscola", BooleanType(), True), + StructField("statusMembroPovoComunidadeTradicional", BooleanType(), True), + StructField("statusParticipaGrupoComunitario", BooleanType(), True), + StructField("statusPossuiPlanoSaudePrivado", BooleanType(), True), + StructField("statusTemAlgumaDeficiencia", BooleanType(), True), + StructField("identidadeGeneroCidadao", StringType(), True), + StructField("statusDesejaInformarIdentidadeGenero", BooleanType(), True), + StructField("responsavelPorCrianca", StringType(), True), + StructField("coPovoComunidadeTradicional", StringType(), True) + ]), True), + StructField("statusTermoRecusaCadastroIndividualAtencaoBasica", BooleanType(), True), + StructField("saidaCidadaoCadastro", StructType([ + StructField("motivoSaidaCidadao", StringType(), True), + StructField("dataObito", DateType(), True), + StructField("numeroDO", StringType(), True) + ]), True) +]) + + +print("Esquema definido com sucesso.") +# Carregar JSON com o esquema +input_path = "s3a://landing/warehouse/fichas_cadastro_individual_20000.jsonl" +output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet" +try: + print("Carregando dados do JSON...") + df = spark.read.schema(schema).json(input_path) + print("Dados carregados com sucesso.") +except Exception as e: + print(f"Erro ao carregar JSON: {e}") + spark.stop() + exit(1) + +try: + print("Gravando dados em formato Parquet...") + df.write.mode("overwrite").parquet(output_path) + print("Dados gravados com sucesso.") +except Exception as e: + print(f"Erro ao gravar Parquet: {e}") +finally: + spark.stop() diff --git a/gen_bronze_200000.py b/gen_bronze_200000.py new file mode 100644 index 0000000..77dadea --- /dev/null +++ b/gen_bronze_200000.py @@ -0,0 +1,145 @@ +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, DateType, ArrayType + +# Configuração do Spark para acessar o MinIO +print("Iniciando a configuração do Spark...") +spark = SparkSession.builder \ + .appName("Landig to Bronze") \ + .config("spark.hadoop.fs.s3a.endpoint", "http://minio.minio-cluster.svc.cluster.local:9000") \ + .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \ + .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \ + .config("spark.hadoop.fs.s3a.path.style.access", "true") \ + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \ + .config("spark.sql.warehouse.dir", "s3a://landing/warehouse") \ + .getOrCreate() + +# Definição completa do esquema +print("Definindo o esquema dos dados...") +schema = StructType([ + StructField("profissionalCNS", LongType(), True), + StructField("cboCodigo_2002", LongType(), True), + StructField("cnes", LongType(), True), + StructField("ine", LongType(), True), + StructField("dataAtendimento", DateType(), True), + StructField("condicoesDeSaude", StructType([ + StructField("descricaoCausaInternacaoEm12Meses", StringType(), True), + StructField("descricaoOutraCondicao1", StringType(), True), + StructField("descricaoOutraCondicao2", StringType(), True), + StructField("descricaoOutraCondicao3", StringType(), True), + StructField("descricaoPlantasMedicinaisUsadas", ArrayType(StringType()), True), + StructField("doencaRespiratoria", ArrayType(StringType()), True), + StructField("doencaRins", ArrayType(StringType()), True), + StructField("maternidadeDeReferencia", StringType(), True), + StructField("situacaoPeso", StringType(), True), + StructField("statusEhDependenteAlcool", BooleanType(), True), + StructField("statusEhDependenteOutrasDrogas", BooleanType(), True), + StructField("statusEhFumante", BooleanType(), True), + StructField("statusEhGestante", BooleanType(), True), + StructField("statusEstaAcamado", BooleanType(), True), + StructField("statusEstaDomiciliado", BooleanType(), True), + StructField("statusTemDiabetes", BooleanType(), True), + StructField("statusTemDoencaRespiratoria", BooleanType(), True), + StructField("statusTemHanseniase", BooleanType(), True), + StructField("statusTemHipertensaoArterial", BooleanType(), True), + StructField("statusTemTeveCancer", BooleanType(), True), + StructField("statusTemTeveDoencasRins", BooleanType(), True), + StructField("statusTemTuberculose", BooleanType(), True), + StructField("statusTeveAvcDerrame", BooleanType(), True), + StructField("statusTeveDoencaCardiaca", BooleanType(), True), + StructField("statusTeveInfarto", BooleanType(), True), + StructField("statusTeveInternadoem12Meses", BooleanType(), True), + StructField("statusUsaOutrasPraticasIntegrativasOuComplementares", BooleanType(), True), + StructField("statusUsaPlantasMedicinais", BooleanType(), True), + StructField("statusDiagnosticoMental", StringType(), True) + ]), True), + StructField("emSituacaoDeRua", StructType([ + StructField("grauParentescoFamiliarFrequentado", StringType(), True), + StructField("higienePessoalSituacaoRua", ArrayType(StringType()), True), + StructField("origemAlimentoSituacaoRua", ArrayType(StringType()), True), + StructField("outraInstituicaoQueAcompanha", StringType(), True), + StructField("quantidadeAlimentacoesAoDiaSituacaoRua", StringType(), True), + StructField("statusAcompanhadoPorOutraInstituicao", BooleanType(), True), + StructField("statusPossuiReferenciaFamiliar", BooleanType(), True), + StructField("statusRecebeBeneficio", BooleanType(), True), + StructField("statusSituacaoRua", BooleanType(), True), + StructField("statusTemAcessoHigienePessoalSituacaoRua", BooleanType(), True), + StructField("statusVisitaFamiliarFrequentemente", BooleanType(), True), + StructField("tempoSituacaoRua", StringType(), True) + ]), True), + StructField("identificacaoUsuarioCidadao", StructType([ + StructField("nomeSocial", StringType(), True), + StructField("município", StringType(), True), + StructField("dataNascimentoCidadao", DateType(), True), + StructField("emailCidadao", StringType(), True), + StructField("nacionalidadeCidadao", StringType(), True), + StructField("nomeCidadao", StringType(), True), + StructField("nomeMaeCidadao", StringType(), True), + StructField("cnsCidadao", LongType(), True), + StructField("cnsResponsavelFamiliar", LongType(), True), + StructField("telefoneCelular", StringType(), True), + StructField("numeroNisPisPasep", LongType(), True), + StructField("paisNascimento", StringType(), True), + StructField("racaCorCidadao", StringType(), True), + StructField("sexoCidadao", StringType(), True), + StructField("statusEhResponsavel", BooleanType(), True), + StructField("etnia", StringType(), True), + StructField("nomePaiCidadao", StringType(), True), + StructField("desconheceNomePai", BooleanType(), True), + StructField("dtNaturalizacao", DateType(), True), + StructField("portariaNaturalizacao", StringType(), True), + StructField("dtEntradaBrasil", DateType(), True), + StructField("microarea", LongType(), True), + StructField("stForaArea", BooleanType(), True), + StructField("cpfCidadao", StringType(), True), + StructField("cpfResponsavelFamiliar", StringType(), True) + ]), True), + StructField("InformacoesSocioDemograficas", StructType([ + StructField("deficienciasCidadao", ArrayType(StringType()), True), + StructField("grauInstrucaoCidadao", StringType(), True), + StructField("ocupacao", StringType(), True), + StructField("orientacaoSexualCidadao", StringType(), True), + StructField("relacaoParentescoCidadao", StringType(), True), + StructField("situacaoMercadoTrabalhoCidadao", StringType(), True), + StructField("statusDesejaInformarOrientacaoSexual", BooleanType(), True), + StructField("statusFrequentaBenzedeira", BooleanType(), True), + StructField("statusFrequentaEscola", BooleanType(), True), + StructField("statusMembroPovoComunidadeTradicional", BooleanType(), True), + StructField("statusParticipaGrupoComunitario", BooleanType(), True), + StructField("statusPossuiPlanoSaudePrivado", BooleanType(), True), + StructField("statusTemAlgumaDeficiencia", BooleanType(), True), + StructField("identidadeGeneroCidadao", StringType(), True), + StructField("statusDesejaInformarIdentidadeGenero", BooleanType(), True), + StructField("responsavelPorCrianca", StringType(), True), + StructField("coPovoComunidadeTradicional", StringType(), True) + ]), True), + StructField("statusTermoRecusaCadastroIndividualAtencaoBasica", BooleanType(), True), + StructField("saidaCidadaoCadastro", StructType([ + StructField("motivoSaidaCidadao", StringType(), True), + StructField("dataObito", DateType(), True), + StructField("numeroDO", StringType(), True) + ]), True) +]) + + +print("Esquema definido com sucesso.") +# Carregar JSON com o esquema +input_path = "s3a://landing/warehouse/fichas_cadastro_individual_200000.jsonl" +output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet" +try: + print("Carregando dados do JSON...") + df = spark.read.schema(schema).json(input_path) + print("Dados carregados com sucesso.") +except Exception as e: + print(f"Erro ao carregar JSON: {e}") + spark.stop() + exit(1) + +try: + print("Gravando dados em formato Parquet...") + df.write.mode("overwrite").parquet(output_path) + print("Dados gravados com sucesso.") +except Exception as e: + print(f"Erro ao gravar Parquet: {e}") +finally: + spark.stop() diff --git a/gen_bronze.py b/gen_bronze_5000.py similarity index 99% rename from gen_bronze.py rename to gen_bronze_5000.py index 914fe78..f6ba772 100644 --- a/gen_bronze.py +++ b/gen_bronze_5000.py @@ -124,7 +124,7 @@ schema = StructType([ print("Esquema definido com sucesso.") # Carregar JSON com o esquema -input_path = "s3a://landing/warehouse/fichas_cadastro_individual_1000.jsonl" +input_path = "s3a://landing/warehouse/fichas_cadastro_individual_5000.jsonl" output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet" try: print("Carregando dados do JSON...") diff --git a/gen_bronze_50000.py b/gen_bronze_50000.py new file mode 100644 index 0000000..248b866 --- /dev/null +++ b/gen_bronze_50000.py @@ -0,0 +1,145 @@ +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, DateType, ArrayType + +# Configuração do Spark para acessar o MinIO +print("Iniciando a configuração do Spark...") +spark = SparkSession.builder \ + .appName("Landig to Bronze") \ + .config("spark.hadoop.fs.s3a.endpoint", "http://minio.minio-cluster.svc.cluster.local:9000") \ + .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \ + .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \ + .config("spark.hadoop.fs.s3a.path.style.access", "true") \ + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \ + .config("spark.sql.warehouse.dir", "s3a://landing/warehouse") \ + .getOrCreate() + +# Definição completa do esquema +print("Definindo o esquema dos dados...") +schema = StructType([ + StructField("profissionalCNS", LongType(), True), + StructField("cboCodigo_2002", LongType(), True), + StructField("cnes", LongType(), True), + StructField("ine", LongType(), True), + StructField("dataAtendimento", DateType(), True), + StructField("condicoesDeSaude", StructType([ + StructField("descricaoCausaInternacaoEm12Meses", StringType(), True), + StructField("descricaoOutraCondicao1", StringType(), True), + StructField("descricaoOutraCondicao2", StringType(), True), + StructField("descricaoOutraCondicao3", StringType(), True), + StructField("descricaoPlantasMedicinaisUsadas", ArrayType(StringType()), True), + StructField("doencaRespiratoria", ArrayType(StringType()), True), + StructField("doencaRins", ArrayType(StringType()), True), + StructField("maternidadeDeReferencia", StringType(), True), + StructField("situacaoPeso", StringType(), True), + StructField("statusEhDependenteAlcool", BooleanType(), True), + StructField("statusEhDependenteOutrasDrogas", BooleanType(), True), + StructField("statusEhFumante", BooleanType(), True), + StructField("statusEhGestante", BooleanType(), True), + StructField("statusEstaAcamado", BooleanType(), True), + StructField("statusEstaDomiciliado", BooleanType(), True), + StructField("statusTemDiabetes", BooleanType(), True), + StructField("statusTemDoencaRespiratoria", BooleanType(), True), + StructField("statusTemHanseniase", BooleanType(), True), + StructField("statusTemHipertensaoArterial", BooleanType(), True), + StructField("statusTemTeveCancer", BooleanType(), True), + StructField("statusTemTeveDoencasRins", BooleanType(), True), + StructField("statusTemTuberculose", BooleanType(), True), + StructField("statusTeveAvcDerrame", BooleanType(), True), + StructField("statusTeveDoencaCardiaca", BooleanType(), True), + StructField("statusTeveInfarto", BooleanType(), True), + StructField("statusTeveInternadoem12Meses", BooleanType(), True), + StructField("statusUsaOutrasPraticasIntegrativasOuComplementares", BooleanType(), True), + StructField("statusUsaPlantasMedicinais", BooleanType(), True), + StructField("statusDiagnosticoMental", StringType(), True) + ]), True), + StructField("emSituacaoDeRua", StructType([ + StructField("grauParentescoFamiliarFrequentado", StringType(), True), + StructField("higienePessoalSituacaoRua", ArrayType(StringType()), True), + StructField("origemAlimentoSituacaoRua", ArrayType(StringType()), True), + StructField("outraInstituicaoQueAcompanha", StringType(), True), + StructField("quantidadeAlimentacoesAoDiaSituacaoRua", StringType(), True), + StructField("statusAcompanhadoPorOutraInstituicao", BooleanType(), True), + StructField("statusPossuiReferenciaFamiliar", BooleanType(), True), + StructField("statusRecebeBeneficio", BooleanType(), True), + StructField("statusSituacaoRua", BooleanType(), True), + StructField("statusTemAcessoHigienePessoalSituacaoRua", BooleanType(), True), + StructField("statusVisitaFamiliarFrequentemente", BooleanType(), True), + StructField("tempoSituacaoRua", StringType(), True) + ]), True), + StructField("identificacaoUsuarioCidadao", StructType([ + StructField("nomeSocial", StringType(), True), + StructField("município", StringType(), True), + StructField("dataNascimentoCidadao", DateType(), True), + StructField("emailCidadao", StringType(), True), + StructField("nacionalidadeCidadao", StringType(), True), + StructField("nomeCidadao", StringType(), True), + StructField("nomeMaeCidadao", StringType(), True), + StructField("cnsCidadao", LongType(), True), + StructField("cnsResponsavelFamiliar", LongType(), True), + StructField("telefoneCelular", StringType(), True), + StructField("numeroNisPisPasep", LongType(), True), + StructField("paisNascimento", StringType(), True), + StructField("racaCorCidadao", StringType(), True), + StructField("sexoCidadao", StringType(), True), + StructField("statusEhResponsavel", BooleanType(), True), + StructField("etnia", StringType(), True), + StructField("nomePaiCidadao", StringType(), True), + StructField("desconheceNomePai", BooleanType(), True), + StructField("dtNaturalizacao", DateType(), True), + StructField("portariaNaturalizacao", StringType(), True), + StructField("dtEntradaBrasil", DateType(), True), + StructField("microarea", LongType(), True), + StructField("stForaArea", BooleanType(), True), + StructField("cpfCidadao", StringType(), True), + StructField("cpfResponsavelFamiliar", StringType(), True) + ]), True), + StructField("InformacoesSocioDemograficas", StructType([ + StructField("deficienciasCidadao", ArrayType(StringType()), True), + StructField("grauInstrucaoCidadao", StringType(), True), + StructField("ocupacao", StringType(), True), + StructField("orientacaoSexualCidadao", StringType(), True), + StructField("relacaoParentescoCidadao", StringType(), True), + StructField("situacaoMercadoTrabalhoCidadao", StringType(), True), + StructField("statusDesejaInformarOrientacaoSexual", BooleanType(), True), + StructField("statusFrequentaBenzedeira", BooleanType(), True), + StructField("statusFrequentaEscola", BooleanType(), True), + StructField("statusMembroPovoComunidadeTradicional", BooleanType(), True), + StructField("statusParticipaGrupoComunitario", BooleanType(), True), + StructField("statusPossuiPlanoSaudePrivado", BooleanType(), True), + StructField("statusTemAlgumaDeficiencia", BooleanType(), True), + StructField("identidadeGeneroCidadao", StringType(), True), + StructField("statusDesejaInformarIdentidadeGenero", BooleanType(), True), + StructField("responsavelPorCrianca", StringType(), True), + StructField("coPovoComunidadeTradicional", StringType(), True) + ]), True), + StructField("statusTermoRecusaCadastroIndividualAtencaoBasica", BooleanType(), True), + StructField("saidaCidadaoCadastro", StructType([ + StructField("motivoSaidaCidadao", StringType(), True), + StructField("dataObito", DateType(), True), + StructField("numeroDO", StringType(), True) + ]), True) +]) + + +print("Esquema definido com sucesso.") +# Carregar JSON com o esquema +input_path = "s3a://landing/warehouse/fichas_cadastro_individual_50000.jsonl" +output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet" +try: + print("Carregando dados do JSON...") + df = spark.read.schema(schema).json(input_path) + print("Dados carregados com sucesso.") +except Exception as e: + print(f"Erro ao carregar JSON: {e}") + spark.stop() + exit(1) + +try: + print("Gravando dados em formato Parquet...") + df.write.mode("overwrite").parquet(output_path) + print("Dados gravados com sucesso.") +except Exception as e: + print(f"Erro ao gravar Parquet: {e}") +finally: + spark.stop() -- GitLab