diff --git a/teste_dag.py b/teste_dag.py index f5470ef66740ab540a23fef4f3a96821bc536a90..e573262b9985ecc188160b213f8078332502ee58 100644 --- a/teste_dag.py +++ b/teste_dag.py @@ -51,28 +51,40 @@ from airflow import DAG from airflow.decorators import task from airflow.models.param import Param -with DAG( - "the_dag", +@dag( + dag_id="process_records_teste", params={ - "x": Param(5, type="integer", minimum=3), - "my_int_param": 6 + "input_file": Param(type="string"), }, -) as dag: - - @task.python - def example_task(params: dict): - # This will print the default value, 6: - dag.log.info(dag.params['my_int_param']) - - # This will print the manually-provided value, 42: - dag.log.info(params['my_int_param']) - - # This will print the default value, 5, since it wasn't provided manually: - dag.log.info(params['x']) - - example_task() - -if __name__ == "__main__": - dag.test( - run_conf={"my_int_param": 42} - ) +) +def process_records(): + @task + def task_spark(params: dict): + return SparkSubmitOperator( + task_id=f"landing_to_bronze", + application=f"{repo_home}/gen_bronze.py", + application_args=[ + params["input_file"] + ], + packages="org.apache.hadoop:hadoop-aws:3.3.4" + ) + + @task + def task_dbt_setup(params: dict): + return BashOperator( + task_id=f"dbt_setup", + bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", + ) + + @task + def task_bronze_to_gold(params: dict): + return BashOperator( + task_id=f"bronze_to_silver_to_gold", + cwd="/tmp/proj_teste", + bash_command="dbt deps && dbt build", + ) + + task_spark() >> task_dbt_setup() >> task_bronze_to_gold() + + +dag = process_records()