diff --git a/teste_dag.py b/teste_dag.py index 15ab58c8725d231c01386cc8fd17c43ff0b11cf5..8659ec6ba90311ef5ae6502106a79c33a1d19ac0 100644 --- a/teste_dag.py +++ b/teste_dag.py @@ -8,13 +8,15 @@ from airflow.models.param import Param from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator + repo_home = '/opt/airflow/dags/repo' @dag( dag_id="process_records_teste", params={ - "input_file": Param("fichas_cadastro_individual_5000.jsonl", type="string"), + "input_file": Param("invalid_file.jsonl", type="string"), }, ) def process_records(): @@ -41,4 +43,25 @@ def process_records(): task_spark >> task_dbt_setup >> task_bronze_to_gold -dag = process_records() +dag1 = process_records() + +@dag( + dag_id="run_tests", + params={ + "files": Param([], type="array"), + }, +) +def run_tests(): + files = "{{ params.my_string_list }}" + + ops = [] + for file in files: + op = TriggerDagRunOperator( + trigger_dag_id=dag1.dag_id, + conf={ "input_file": file } + ) + + ops.append(op) + +dag2 = run_tests() +