Skip to content
Snippets Groups Projects
Commit 9a5c4f43 authored by edvs19's avatar edvs19
Browse files

teste

parent 4d9f0814
No related branches found
No related tags found
No related merge requests found
...@@ -51,28 +51,40 @@ from airflow import DAG ...@@ -51,28 +51,40 @@ from airflow import DAG
from airflow.decorators import task from airflow.decorators import task
from airflow.models.param import Param from airflow.models.param import Param
with DAG( @dag(
"the_dag", dag_id="process_records_teste",
params={ params={
"x": Param(5, type="integer", minimum=3), "input_file": Param(type="string"),
"my_int_param": 6
}, },
) as dag: )
def process_records():
@task
def task_spark(params: dict):
return SparkSubmitOperator(
task_id=f"landing_to_bronze",
application=f"{repo_home}/gen_bronze.py",
application_args=[
params["input_file"]
],
packages="org.apache.hadoop:hadoop-aws:3.3.4"
)
@task.python @task
def example_task(params: dict): def task_dbt_setup(params: dict):
# This will print the default value, 6: return BashOperator(
dag.log.info(dag.params['my_int_param']) task_id=f"dbt_setup",
bash_command=f"rm -rf /tmp/proj_teste && cp -r {repo_home}/proj_teste /tmp/proj_teste",
)
# This will print the manually-provided value, 42: @task
dag.log.info(params['my_int_param']) def task_bronze_to_gold(params: dict):
return BashOperator(
task_id=f"bronze_to_silver_to_gold",
cwd="/tmp/proj_teste",
bash_command="dbt deps && dbt build",
)
# This will print the default value, 5, since it wasn't provided manually: task_spark() >> task_dbt_setup() >> task_bronze_to_gold()
dag.log.info(params['x'])
example_task()
if __name__ == "__main__": dag = process_records()
dag.test(
run_conf={"my_int_param": 42}
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment