Skip to content
Snippets Groups Projects
Commit 1003c213 authored by edvs19's avatar edvs19
Browse files

teste

parent 3570e95a
Branches
No related tags found
No related merge requests found
import datetime
from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.models.param import Param
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
repo_home = '/opt/airflow/dags/repo'
@dag(
params={
"input_file" : Param(
"nofile.jsonl",
"jsonl to be used as input in landing to bronze",
str
)
}
)
def process_records(params: dict):
"""
DAG que realiza o fluxo landing -> bronze -> silver -> gold
"""
task_1 = SparkSubmitOperator(
application=f"{repo_home}/gen_bronze.py",
task_id="landing_to_bronze",
packages = "org.apache.hadoop:hadoop-aws:3.3.4"
)
task_2 = BashOperator(
task_id="dbt_setup",
bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
)
task_3 = BashOperator(
task_id="bronze_to_silver_to_gold",
cwd="/tmp/proj_teste",
bash_command="dbt deps && dbt build",
)
task_1 >> task_2 >> task_3
@dag
def teste(
params={
"input_file" : Param(type=str)
}
):
print(params.input_file)
@dag
def run_tests():
dgr = TriggerDagRunOperator(
trigger_dag_id="other_dag",
task_id="teste",
conf={"file_path": "{{ params.file_path }}"}
)
dag2 = teste()
# dag3 = run_tests()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment