From 1003c21358ecbfc8a35cfb93afa28d2d07a33b8b Mon Sep 17 00:00:00 2001 From: edvs19 <edvs19@inf.ufpr.br> Date: Thu, 6 Mar 2025 14:20:50 -0300 Subject: [PATCH] teste --- teste_dag.py | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 teste_dag.py diff --git a/teste_dag.py b/teste_dag.py new file mode 100644 index 0000000..c76bd6a --- /dev/null +++ b/teste_dag.py @@ -0,0 +1,67 @@ +import datetime + +from airflow import DAG +from airflow.decorators import dag, task +from airflow.operators.empty import EmptyOperator +from airflow.operators.bash import BashOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.models.param import Param + +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator + + +repo_home = '/opt/airflow/dags/repo' + +@dag( + params={ + "input_file" : Param( + "nofile.jsonl", + "jsonl to be used as input in landing to bronze", + str + ) + } +) +def process_records(params: dict): + """ + DAG que realiza o fluxo landing -> bronze -> silver -> gold + """ + + task_1 = SparkSubmitOperator( + application=f"{repo_home}/gen_bronze.py", + task_id="landing_to_bronze", + packages = "org.apache.hadoop:hadoop-aws:3.3.4" + ) + + task_2 = BashOperator( + task_id="dbt_setup", + bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", + ) + + task_3 = BashOperator( + task_id="bronze_to_silver_to_gold", + cwd="/tmp/proj_teste", + bash_command="dbt deps && dbt build", + ) + + task_1 >> task_2 >> task_3 + + +@dag +def teste( + params={ + "input_file" : Param(type=str) + } +): + print(params.input_file) + +@dag +def run_tests(): + dgr = TriggerDagRunOperator( + trigger_dag_id="other_dag", + task_id="teste", + conf={"file_path": "{{ params.file_path }}"} + ) + + +dag2 = teste() +# dag3 = run_tests() -- GitLab