From 132915b85e85642c0e80243dfddd94388635a18c Mon Sep 17 00:00:00 2001 From: edvs19 <edvs19@inf.ufpr.br> Date: Thu, 6 Mar 2025 17:52:56 -0300 Subject: [PATCH] teste --- teste_dag.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/teste_dag.py b/teste_dag.py index 15ab58c..8659ec6 100644 --- a/teste_dag.py +++ b/teste_dag.py @@ -8,13 +8,15 @@ from airflow.models.param import Param from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator + repo_home = '/opt/airflow/dags/repo' @dag( dag_id="process_records_teste", params={ - "input_file": Param("fichas_cadastro_individual_5000.jsonl", type="string"), + "input_file": Param("invalid_file.jsonl", type="string"), }, ) def process_records(): @@ -41,4 +43,25 @@ def process_records(): task_spark >> task_dbt_setup >> task_bronze_to_gold -dag = process_records() +dag1 = process_records() + +@dag( + dag_id="run_tests", + params={ + "files": Param([], type="array"), + }, +) +def run_tests(): + files = "{{ params.my_string_list }}" + + ops = [] + for file in files: + op = TriggerDagRunOperator( + trigger_dag_id=dag1.dag_id, + conf={ "input_file": file } + ) + + ops.append(op) + +dag2 = run_tests() + -- GitLab