diff --git a/teste_dag.py b/teste_dag.py index 630cea9fd15a2f19a3d9734a16b3587d19422c61..adab9b8d052d9a36c42d63c6cfd5c4c637099234 100644 --- a/teste_dag.py +++ b/teste_dag.py @@ -12,19 +12,18 @@ from airflow.operators.trigger_dagrun import TriggerDagRunOperator repo_home = '/opt/airflow/dags/repo' - -@dag( +############# PROCESS A SINGLE RECRODS FILE ############### +with DAG( dag_id="process_records_teste", params={ "input_file": Param("invalid_file.jsonl", type="string"), }, -) -def process_records(): +) as dag1: task_spark = SparkSubmitOperator( task_id="landing_to_bronze", application=f"{repo_home}/gen_bronze.py", application_args=[ - "{{ params.input_file }}" + dag1.params.input_file ], packages="org.apache.hadoop:hadoop-aws:3.3.4" ) @@ -43,16 +42,15 @@ def process_records(): task_spark >> task_dbt_setup >> task_bronze_to_gold -dag1 = process_records() +############# RUN TESTS DAG ############## -@dag( +with DAG( dag_id="run_tests", params={ "files": Param([], type="array"), }, -) -def run_tests(): - files = "{{ params.my_string_list }}" +) as dag2: + files = dag2.params.files ops = [] for file in files: @@ -63,6 +61,3 @@ def run_tests(): ) ops.append(op) - -dag2 = run_tests() -