From b575cb2422f95bcf7c10c09267298462c4051181 Mon Sep 17 00:00:00 2001 From: edvs19 <edvs19@inf.ufpr.br> Date: Wed, 12 Mar 2025 01:16:50 -0300 Subject: [PATCH] test --- dags.py | 2 +- teste_dag.py | 114 +++++++++++++++++++++++++-------------------------- 2 files changed, 58 insertions(+), 58 deletions(-) diff --git a/dags.py b/dags.py index 9329896..95c5fdc 100644 --- a/dags.py +++ b/dags.py @@ -55,4 +55,4 @@ def process_records(): for i in range(1, len(tasks)): tasks[i-1][1] >> tasks[i][0] -dag = process_records() \ No newline at end of file +dag = process_records() diff --git a/teste_dag.py b/teste_dag.py index c73e278..947ca90 100644 --- a/teste_dag.py +++ b/teste_dag.py @@ -1,73 +1,73 @@ -import datetime +# import datetime -from airflow import DAG -from airflow.decorators import dag, task -from airflow.operators.empty import EmptyOperator -from airflow.operators.bash import BashOperator -from airflow.models.param import Param +# from airflow import DAG +# from airflow.decorators import dag, task +# from airflow.operators.empty import EmptyOperator +# from airflow.operators.bash import BashOperator +# from airflow.models.param import Param -from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator +# from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator -from airflow.operators.trigger_dagrun import TriggerDagRunOperator +# from airflow.operators.trigger_dagrun import TriggerDagRunOperator -repo_home = '/opt/airflow/dags/repo' +# repo_home = '/opt/airflow/dags/repo' -############# PROCESS A SINGLE RECRODS FILE ############### -with DAG( - dag_id="process_records_teste", - params={ - "input_file": Param("invalid_file.jsonl", type="string"), - }, -) as dag1: - file = dag1.params["input_file"] +# ############# PROCESS A SINGLE RECRODS FILE ############### +# with DAG( +# dag_id="process_records_teste", +# params={ +# "input_file": Param("invalid_file.jsonl", type="string"), +# }, +# ) as dag1: +# file = dag1.params["input_file"] - dag1.log.info(f"Input file { file }") +# dag1.log.info(f"Input file { file }") - task_spark = SparkSubmitOperator( - task_id="landing_to_bronze", - application=f"{repo_home}/gen_bronze.py", - application_args=[ - file - ], - packages="org.apache.hadoop:hadoop-aws:3.3.4" - ) +# task_spark = SparkSubmitOperator( +# task_id="landing_to_bronze", +# application=f"{repo_home}/gen_bronze.py", +# application_args=[ +# file +# ], +# packages="org.apache.hadoop:hadoop-aws:3.3.4" +# ) - task_dbt_setup = BashOperator( - task_id="dbt_setup", - bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", - ) +# task_dbt_setup = BashOperator( +# task_id="dbt_setup", +# bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste", +# ) - task_bronze_to_gold = BashOperator( - task_id="bronze_to_silver_to_gold", - cwd="/tmp/proj_teste", - bash_command="dbt deps && dbt build", - ) +# task_bronze_to_gold = BashOperator( +# task_id="bronze_to_silver_to_gold", +# cwd="/tmp/proj_teste", +# bash_command="dbt deps && dbt build", +# ) - task_spark >> task_dbt_setup >> task_bronze_to_gold +# task_spark >> task_dbt_setup >> task_bronze_to_gold -############# RUN TESTS DAG ############## +# ############# RUN TESTS DAG ############## -with DAG( - dag_id="run_tests", - params={ - "files": Param([], type="array"), - }, -) as dag2: - files = dag2.params["files"] +# with DAG( +# dag_id="run_tests", +# params={ +# "files": Param([], type="array"), +# }, +# ) as dag2: +# files = dag2.params["files"] - dag2.log.info(f"Input file { files }") +# dag2.log.info(f"Input file { files }") - ops = [] - for file in files: - op = TriggerDagRunOperator( - task_id=file, - trigger_dag_id=dag1.dag_id, - conf={ "input_file": file } - ) - ops.append(op) +# ops = [] +# for file in files: +# op = TriggerDagRunOperator( +# task_id=file, +# trigger_dag_id=dag1.dag_id, +# conf={ "input_file": file } +# ) +# ops.append(op) - # Chain operators - if len(ops) > 0: - for i in range(len(ops) - 1): - ops[i] >> ops[i + 1] +# # Chain operators +# if len(ops) > 0: +# for i in range(len(ops) - 1): +# ops[i] >> ops[i + 1] -- GitLab