Skip to content
Snippets Groups Projects
Commit b575cb24 authored by edvs19's avatar edvs19
Browse files

test

parent 7a18db03
Branches
No related tags found
No related merge requests found
import datetime
from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.models.param import Param
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
repo_home = '/opt/airflow/dags/repo'
############# PROCESS A SINGLE RECRODS FILE ###############
with DAG(
dag_id="process_records_teste",
params={
"input_file": Param("invalid_file.jsonl", type="string"),
},
) as dag1:
file = dag1.params["input_file"]
dag1.log.info(f"Input file { file }")
task_spark = SparkSubmitOperator(
task_id="landing_to_bronze",
application=f"{repo_home}/gen_bronze.py",
application_args=[
file
],
packages="org.apache.hadoop:hadoop-aws:3.3.4"
)
task_dbt_setup = BashOperator(
task_id="dbt_setup",
bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
)
task_bronze_to_gold = BashOperator(
task_id="bronze_to_silver_to_gold",
cwd="/tmp/proj_teste",
bash_command="dbt deps && dbt build",
)
task_spark >> task_dbt_setup >> task_bronze_to_gold
############# RUN TESTS DAG ##############
with DAG(
dag_id="run_tests",
params={
"files": Param([], type="array"),
},
) as dag2:
files = dag2.params["files"]
dag2.log.info(f"Input file { files }")
ops = []
for file in files:
op = TriggerDagRunOperator(
task_id=file,
trigger_dag_id=dag1.dag_id,
conf={ "input_file": file }
)
ops.append(op)
# Chain operators
if len(ops) > 0:
for i in range(len(ops) - 1):
ops[i] >> ops[i + 1]
# import datetime
# from airflow import DAG
# from airflow.decorators import dag, task
# from airflow.operators.empty import EmptyOperator
# from airflow.operators.bash import BashOperator
# from airflow.models.param import Param
# from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
# from airflow.operators.trigger_dagrun import TriggerDagRunOperator
# repo_home = '/opt/airflow/dags/repo'
# ############# PROCESS A SINGLE RECRODS FILE ###############
# with DAG(
# dag_id="process_records_teste",
# params={
# "input_file": Param("invalid_file.jsonl", type="string"),
# },
# ) as dag1:
# file = dag1.params["input_file"]
# dag1.log.info(f"Input file { file }")
# task_spark = SparkSubmitOperator(
# task_id="landing_to_bronze",
# application=f"{repo_home}/gen_bronze.py",
# application_args=[
# file
# ],
# packages="org.apache.hadoop:hadoop-aws:3.3.4"
# )
# task_dbt_setup = BashOperator(
# task_id="dbt_setup",
# bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
# )
# task_bronze_to_gold = BashOperator(
# task_id="bronze_to_silver_to_gold",
# cwd="/tmp/proj_teste",
# bash_command="dbt deps && dbt build",
# )
# task_spark >> task_dbt_setup >> task_bronze_to_gold
# ############# RUN TESTS DAG ##############
# with DAG(
# dag_id="run_tests",
# params={
# "files": Param([], type="array"),
# },
# ) as dag2:
# files = dag2.params["files"]
# dag2.log.info(f"Input file { files }")
# ops = []
# for file in files:
# op = TriggerDagRunOperator(
# task_id=file,
# trigger_dag_id=dag1.dag_id,
# conf={ "input_file": file }
# )
# ops.append(op)
# # Chain operators
# if len(ops) > 0:
# for i in range(len(ops) - 1):
# ops[i] >> ops[i + 1]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment