Skip to content
Snippets Groups Projects
Commit 735dac70 authored by edvs19's avatar edvs19
Browse files

teste

parent 54757474
Branches
No related tags found
No related merge requests found
...@@ -125,7 +125,7 @@ schema = StructType([ ...@@ -125,7 +125,7 @@ schema = StructType([
print("Esquema definido com sucesso.") print("Esquema definido com sucesso.")
# Carregar JSON com o esquema # Carregar JSON com o esquema
input_path = "s3a://landing/" + sys.args[1] input_path = "s3a://landing/" + sys.argv[1]
output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet" output_path = "s3a://bronze/warehouse/fichas_cadastro_individual_parquet"
try: try:
print("Carregando dados do JSON...") print("Carregando dados do JSON...")
......
...@@ -4,52 +4,12 @@ from airflow import DAG ...@@ -4,52 +4,12 @@ from airflow import DAG
from airflow.decorators import dag, task from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator from airflow.operators.bash import BashOperator
from airflow.models.param import Param
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.models import Variable
repo_home = '/opt/airflow/dags/repo' repo_home = '/opt/airflow/dags/repo'
# @dag()
# def process_records():
# """
# DAG que realiza o fluxo landing -> bronze -> silver -> gold
# """
# # SparkSubmitOperator
# task_spark = SparkSubmitOperator(
# task_id=f"landing_to_bronze",
# application=f"{repo_home}/gen_bronze.py",
# application_args=[
# Variable.get("input_file")
# ],
# packages="org.apache.hadoop:hadoop-aws:3.3.4"
# )
# # BashOperator 1 - dbt_setup
# task_dbt_setup = BashOperator(
# task_id=f"dbt_setup",
# bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
# )
# # BashOperator 2 - bronze_to_silver_to_gold
# task_bronze_to_gold = BashOperator(
# task_id=f"bronze_to_silver_to_gold",
# cwd="/tmp/proj_teste",
# bash_command="dbt deps && dbt build",
# )
# task_spark >> task_dbt_setup >> task_bronze_to_gold
# dag = process_records()
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
@dag( @dag(
dag_id="process_records_teste", dag_id="process_records_teste",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment