diff --git a/teste_dag.py b/teste_dag.py index caa9438e247d2d0ed30109fa741dce9b708d62ad..f5470ef66740ab540a23fef4f3a96821bc536a90 100644 --- a/teste_dag.py +++ b/teste_dag.py @@ -13,35 +13,66 @@ from airflow.models import Variable repo_home = '/opt/airflow/dags/repo' -@dag() -def process_records(): - """ - DAG que realiza o fluxo landing -> bronze -> silver -> gold - """ - # SparkSubmitOperator - task_spark = SparkSubmitOperator( - task_id=f"landing_to_bronze", - application=f"{repo_home}/gen_bronze.py", - application_args=[ - Variable.get("input_file") - ], - packages="org.apache.hadoop:hadoop-aws:3.3.4" - ) +# @dag() +# def process_records(): +# """ +# DAG que realiza o fluxo landing -> bronze -> silver -> gold +# """ +# # SparkSubmitOperator +# task_spark = SparkSubmitOperator( +# task_id=f"landing_to_bronze", +# application=f"{repo_home}/gen_bronze.py", +# application_args=[ +# Variable.get("input_file") +# ], +# packages="org.apache.hadoop:hadoop-aws:3.3.4" +# ) - # BashOperator 1 - dbt_setup - task_dbt_setup = BashOperator( - task_id=f"dbt_setup", - bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", - ) +# # BashOperator 1 - dbt_setup +# task_dbt_setup = BashOperator( +# task_id=f"dbt_setup", +# bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", +# ) + +# # BashOperator 2 - bronze_to_silver_to_gold +# task_bronze_to_gold = BashOperator( +# task_id=f"bronze_to_silver_to_gold", +# cwd="/tmp/proj_teste", +# bash_command="dbt deps && dbt build", +# ) - # BashOperator 2 - bronze_to_silver_to_gold - task_bronze_to_gold = BashOperator( - task_id=f"bronze_to_silver_to_gold", - cwd="/tmp/proj_teste", - bash_command="dbt deps && dbt build", - ) +# task_spark >> task_dbt_setup >> task_bronze_to_gold - task_spark >> task_dbt_setup >> task_bronze_to_gold +# dag = process_records() + + +from airflow import DAG +from airflow.decorators import task +from airflow.models.param import Param -dag = process_records() +with DAG( + "the_dag", + params={ + "x": Param(5, type="integer", minimum=3), + "my_int_param": 6 + }, +) as dag: + + @task.python + def example_task(params: dict): + # This will print the default value, 6: + dag.log.info(dag.params['my_int_param']) + + # This will print the manually-provided value, 42: + dag.log.info(params['my_int_param']) + + # This will print the default value, 5, since it wasn't provided manually: + dag.log.info(params['x']) + + example_task() + +if __name__ == "__main__": + dag.test( + run_conf={"my_int_param": 42} + )