From 0f8dba1cc3fc6f1515f5f5087fdfc885d614a3d1 Mon Sep 17 00:00:00 2001 From: edvs19 <edvs19@inf.ufpr.br> Date: Thu, 6 Mar 2025 17:57:54 -0300 Subject: [PATCH] TESTE --- teste_dag.py | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/teste_dag.py b/teste_dag.py index 630cea9..adab9b8 100644 --- a/teste_dag.py +++ b/teste_dag.py @@ -12,19 +12,18 @@ from airflow.operators.trigger_dagrun import TriggerDagRunOperator repo_home = '/opt/airflow/dags/repo' - -@dag( +############# PROCESS A SINGLE RECRODS FILE ############### +with DAG( dag_id="process_records_teste", params={ "input_file": Param("invalid_file.jsonl", type="string"), }, -) -def process_records(): +) as dag1: task_spark = SparkSubmitOperator( task_id="landing_to_bronze", application=f"{repo_home}/gen_bronze.py", application_args=[ - "{{ params.input_file }}" + dag1.params.input_file ], packages="org.apache.hadoop:hadoop-aws:3.3.4" ) @@ -43,16 +42,15 @@ def process_records(): task_spark >> task_dbt_setup >> task_bronze_to_gold -dag1 = process_records() +############# RUN TESTS DAG ############## -@dag( +with DAG( dag_id="run_tests", params={ "files": Param([], type="array"), }, -) -def run_tests(): - files = "{{ params.my_string_list }}" +) as dag2: + files = dag2.params.files ops = [] for file in files: @@ -63,6 +61,3 @@ def run_tests(): ) ops.append(op) - -dag2 = run_tests() - -- GitLab