diff --git a/dags.py b/dags.py index 464e40812a7644c2f8a973cf5d6e88353598c40f..932989688bb1b8c11f149ac6d041b2777c79b1e5 100644 --- a/dags.py +++ b/dags.py @@ -21,7 +21,10 @@ def process_records(): for i, size in enumerate(sizes, 1): # SparkSubmitOperator task_spark = SparkSubmitOperator( - application=f"{repo_home}/gen_bronze_{size}.py", + application=f"{repo_home}/gen_bronze.py", + application_args=[ + f"fichas_cadastro_individual_{size}.jsonl" + ], task_id=f"landing_to_bronze_{size}", packages="org.apache.hadoop:hadoop-aws:3.3.4" ) diff --git a/gen_bronze.py b/gen_bronze.py index 2be99842db45618935a8bb066c1179b7bb0fe6ec..68d8b0e59decdd34028570278d7bc5a636f428c0 100644 --- a/gen_bronze.py +++ b/gen_bronze.py @@ -1,6 +1,6 @@ -import sys from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, DateType, ArrayType +import sys # Configuração do Spark para acessar o MinIO print("Iniciando a configuração do Spark...") @@ -95,7 +95,7 @@ schema = StructType([ StructField("cpfCidadao", StringType(), True), StructField("cpfResponsavelFamiliar", StringType(), True) ]), True), - StructField("InformacoesSocioDemograficas", StructType([ + StructField("informacoesSocioDemograficas", StructType([ StructField("deficienciasCidadao", ArrayType(StringType()), True), StructField("grauInstrucaoCidadao", StringType(), True), StructField("ocupacao", StringType(), True), @@ -125,8 +125,8 @@ schema = StructType([ print("Esquema definido com sucesso.") # Carregar JSON com o esquema -filename = sys.argv[1] -input_path = "s3a://landing/warehouse/" + filename +file = sys.argv[1] +input_path = f"s3a://landing/warehouse/{file}" output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet" try: print("Carregando dados do JSON...") diff --git a/proj_teste/models/newmodel/new.sql b/proj_teste/models/newmodel/new.sql new file mode 100644 index 0000000000000000000000000000000000000000..9954901bfdadee22b1d9b40fe1cb459b330ccf3c --- /dev/null +++ b/proj_teste/models/newmodel/new.sql @@ -0,0 +1,37 @@ +pessoa +nome_completo identificacaoUsuarioCidadao.nomeCidadao +data_nascimento identificacaoUsuarioCidadao.dataNascimentoCidadao +sexo identificacaoUsuarioCidadao.sexoCidadao +raca_cor identificacaoUsuarioCidadao.racaCorCidadao +etnia identificacaoUsuarioCidadao.etnia +cns identificacaoUsuarioCidadao.cnsCidadao +telefone identificacaoUsuarioCidadao.telefoneCelular +email identificacaoUsuarioCidadao.emailCidadao +nome_social identificacaoUsuarioCidadao.nomeSocial +pis_pasep identificacaoUsuarioCidadao.numeroNisPisPasep +nome_mae identificacaoUsuarioCidadao.nomeMaeCidadao +nome_pai identificacaoUsuarioCidadao.nomePaiCidadao +nacionalidade identificacaoUsuarioCidadao.paisNascimento +orientacao_sexual informacoesSocioDemograficas.orientacaoSexualCidadao +genero informacoesSocioDemograficas.identidadeGeneroCidadao +obito saidaCidadaoCadastro.motivoSaidaCidadao == 'Óbito' +dt_obito saidaCidadaoCadastro.dataObito +num_do_obito saidaCidadaoCadastro.numeroDO + +saude +deficiencia_auditiva 'Auditiva' in informacoesSocioDemograficas.deficienciasCidadao +deficiencia_visual 'Visual' in informacoesSocioDemograficas.deficienciasCidadao +deficiencia_intelectual 'Intelectual / Cognitiva' in informacoesSocioDemograficas.deficienciasCidadao +deficiencia_fisica 'Física' in informacoesSocioDemograficas.deficienciasCidadao +peso_status condicoesDeSaude.situacaoPeso +doenca_coracao condicoesDeSaude.statusTeveDoencaCardiaca +doenca_rim condicoesDeSaude.doencaRins != null +situacao_rua emSituacaoDeRua != null +acompanhado_outra_insituicao emSituacaoDeRua.statusAcompanhadoPorOutraInstituicao +nome_insituicao emSituacaoDeRua.outraInstituicaoQueAcompanha +acesso_higiene_banho 'Banho' in emSituacaoDeRua.higienePessoalSituacaoRua +acesso_higiene_sanitario 'Acesso a sanitário' in emSituacaoDeRua.higienePessoalSituacaoRua +acesso_higiene_bucal 'Higiene bucal' in emSituacaoDeRua.higienePessoalSituacaoRua +outras_higiene 'Outros' in emSituacaoDeRua.higienePessoalSituacaoRua +mudanca_territorio saidaCidadaoCadastro.motivoSaidaCidadao == 'Mudança de território' +situacao_gestacional condicoesDeSaude.statusEhGestante