diff --git a/teste_dag.py b/teste_dag.py new file mode 100644 index 0000000000000000000000000000000000000000..c76bd6a8201b440bd0af2a7ef6f40952086ac8f3 --- /dev/null +++ b/teste_dag.py @@ -0,0 +1,67 @@ +import datetime + +from airflow import DAG +from airflow.decorators import dag, task +from airflow.operators.empty import EmptyOperator +from airflow.operators.bash import BashOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.models.param import Param + +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator + + +repo_home = '/opt/airflow/dags/repo' + +@dag( + params={ + "input_file" : Param( + "nofile.jsonl", + "jsonl to be used as input in landing to bronze", + str + ) + } +) +def process_records(params: dict): + """ + DAG que realiza o fluxo landing -> bronze -> silver -> gold + """ + + task_1 = SparkSubmitOperator( + application=f"{repo_home}/gen_bronze.py", + task_id="landing_to_bronze", + packages = "org.apache.hadoop:hadoop-aws:3.3.4" + ) + + task_2 = BashOperator( + task_id="dbt_setup", + bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", + ) + + task_3 = BashOperator( + task_id="bronze_to_silver_to_gold", + cwd="/tmp/proj_teste", + bash_command="dbt deps && dbt build", + ) + + task_1 >> task_2 >> task_3 + + +@dag +def teste( + params={ + "input_file" : Param(type=str) + } +): + print(params.input_file) + +@dag +def run_tests(): + dgr = TriggerDagRunOperator( + trigger_dag_id="other_dag", + task_id="teste", + conf={"file_path": "{{ params.file_path }}"} + ) + + +dag2 = teste() +# dag3 = run_tests()