Example DAGs
This section contains a few DAGs showing off some dbt pipelines to get you going.
Warning
All example DAGs are tested against against apache-airflow==2.2.5
. Some changes, like modifying import
statements or changing types, may be required for them to work in environments running other versions of Airflow.
Basic DAG
This basic DAG shows off a single DbtRunOperator
that executes daily:
1"""Sample basic DAG which dbt runs a project."""
2import datetime as dt
3
4from airflow import DAG
5from airflow.utils.dates import days_ago
6from airflow_dbt_python.operators.dbt import DbtRunOperator
7
8with DAG(
9 dag_id="example_basic_dbt_run",
10 schedule_interval="0 * * * *",
11 start_date=days_ago(1),
12 catchup=False,
13 dagrun_timeout=dt.timedelta(minutes=60),
14) as dag:
15 dbt_run = DbtRunOperator(
16 task_id="dbt_run_hourly",
17 project_dir="/path/to/my/dbt/project/",
18 profiles_dir="~/.dbt/",
19 select=["+tag:hourly"],
20 exclude=["tag:deprecated"],
21 target="production",
22 profile="my-project",
23 full_refresh=False,
24 )
Run and Docs from S3
This DAG shows off a DbtRunOperator
followed by a DbtDocsGenerateOperator
. Both execute daily, and run from dbt project files available in an S3 URL:
1"""Sample basic DAG which showcases a dbt project being pulled from S3."""
2import datetime as dt
3
4from airflow import DAG
5from airflow.utils.dates import days_ago
6from airflow_dbt_python.operators.dbt import DbtDocsGenerateOperator, DbtRunOperator
7
8with DAG(
9 dag_id="example_basic_dbt_run_with_s3",
10 schedule_interval="0 * * * *",
11 start_date=days_ago(1),
12 catchup=False,
13 dagrun_timeout=dt.timedelta(minutes=60),
14) as dag:
15 # Project files will be pulled from "s3://my-bucket/dbt/profiles/key/prefix/"
16 dbt_run = DbtRunOperator(
17 task_id="dbt_run_hourly",
18 project_dir="s3://my-bucket/dbt/project/key/prefix/",
19 profiles_dir="s3://my-bucket/dbt/profiles/key/prefix/",
20 select=["+tag:hourly"],
21 exclude=["tag:deprecated"],
22 target="production",
23 profile="my-project",
24 full_refresh=False,
25 )
26
27 # Documentation files (target/manifest.json, target/index.html, and
28 # target/catalog.json) will be pushed back to S3 after compilation is done.
29 dbt_docs = DbtDocsGenerateOperator(
30 task_id="dbt_run_hourly",
31 project_dir="s3://my-bucket/dbt/project/key/prefix/",
32 profiles_dir="s3://my-bucket/dbt/profiles/key/prefix/",
33 )
34
35 dbt_run >> dbt_docs
Complete dbt workflow
This DAG shows off a (almost) complete dbt workflow as it would be run from the CLI: we begin by running DbtSourceOperator
to test the freshness of our source tables, DbtSeedOperator
follows to load up any static data. Then, two instances of DbtRunOperator
are created: one to handle incremental data, and the other one to run any non-incremental models. Finally, we run our tests to ensure our models remain correct.
1"""Sample DAG showcasing a complete dbt workflow.
2
3The complete workflow includes a sequence of source, seed, and several run commands.
4"""
5import datetime as dt
6
7from airflow import DAG
8from airflow.utils.dates import days_ago
9from airflow_dbt_python.operators.dbt import (
10 DbtRunOperator,
11 DbtSeedOperator,
12 DbtSourceOperator,
13 DbtTestOperator,
14)
15
16with DAG(
17 dag_id="example_complete_dbt_workflow",
18 schedule_interval="0 * * * *",
19 start_date=days_ago(1),
20 catchup=False,
21 dagrun_timeout=dt.timedelta(minutes=60),
22) as dag:
23 dbt_source = DbtSourceOperator(
24 task_id="dbt_run_incremental_hourly",
25 project_dir="/path/to/my/dbt/project/",
26 profiles_dir="~/.dbt/",
27 target="production",
28 profile="my-project",
29 do_xcom_push_artifacts=["sources.json"],
30 )
31
32 dbt_seed = DbtSeedOperator(
33 task_id="dbt_seed",
34 project_dir="/path/to/my/dbt/project/",
35 profiles_dir="~/.dbt/",
36 target="production",
37 profile="my-project",
38 )
39
40 dbt_run_incremental = DbtRunOperator(
41 task_id="dbt_run_incremental_hourly",
42 project_dir="/path/to/my/dbt/project/",
43 profiles_dir="~/.dbt/",
44 select=["tag:hourly,config.materialized:incremental"],
45 exclude=["tag:deprecated"],
46 target="production",
47 profile="my-project",
48 full_refresh=False,
49 )
50
51 dbt_run = DbtRunOperator(
52 task_id="dbt_run_hourly",
53 project_dir="/path/to/my/dbt/project/",
54 profiles_dir="~/.dbt/",
55 select=["+tag:hourly"],
56 exclude=["tag:deprecated,config.materialized:incremental"],
57 target="production",
58 profile="my-project",
59 full_refresh=True,
60 )
61
62 dbt_test = DbtTestOperator(
63 task_id="dbt_test",
64 project_dir="/path/to/my/dbt/project/",
65 profiles_dir="~/.dbt/",
66 target="production",
67 profile="my-project",
68 )
69
70 dbt_source >> dbt_seed >> dbt_run_incremental >> dbt_run >> dbt_test
Using dbt artifacts
The following DAG showcases how to use dbt artifacts that are made available via XCom by airflow-dbt-python. A sample function calculates the longest running dbt model by pulling the artifacts that were generated after DbtRunOperator
executes. We specify which dbt artifacts via the do_xcom_push_artifacts
parameter.
1"""Sample DAG to showcase pulling dbt artifacts from XCOM."""
2import datetime as dt
3
4from airflow import DAG
5from airflow.operators.python_operator import PythonOperator
6from airflow.utils.dates import days_ago
7from airflow_dbt_python.operators.dbt import DbtRunOperator
8
9
10def process_dbt_artifacts(**context):
11 """Report which model or models took the longest to compile and execute."""
12 run_results = context["ti"].xcom_pull(
13 key="run_results.json", task_ids="dbt_run_daily"
14 )
15 longest_compile = None
16 longest_execute = None
17
18 for result in run_results["results"]:
19 if result["status"] != "success":
20 continue
21
22 model_id = result["unique_id"]
23 for timing in result["timing"]:
24 duration = (
25 dt.datetime.strptime(
26 timing["started_at"], format="%Y-%m-%dT%H:%M:%S.%fZ"
27 )
28 - dt.datetime.strptime(
29 timing["completed_at"], format="%Y-%m-%dT%H:%M:%S.%fZ"
30 )
31 ).total_seconds()
32
33 if timing["name"] == "execute":
34 if longest_execute is None or duration > longest_execute[1]:
35 longest_execute = (model_id, duration)
36
37 elif timing["name"] == "compile":
38 if longest_compile is None or duration > longest_compile[1]:
39 longest_compile = (model_id, duration)
40
41 print(
42 f"{longest_execute[0]} took the longest to execute with a time of "
43 f"{longest_execute[1]} seconds!"
44 )
45 print(
46 f"{longest_compile[0]} took the longest to compile with a time of "
47 f"{longest_compile[1]} seconds!"
48 )
49
50with DAG(
51 dag_id="example_dbt_artifacts",
52 schedule_interval="0 0 * * *",
53 start_date=days_ago(1),
54 catchup=False,
55 dagrun_timeout=dt.timedelta(minutes=60),
56) as dag:
57 dbt_run = DbtRunOperator(
58 task_id="dbt_run_daily",
59 project_dir="/path/to/my/dbt/project/",
60 profiles_dir="~/.dbt/",
61 select=["+tag:daily"],
62 exclude=["tag:deprecated"],
63 target="production",
64 profile="my-project",
65 full_refresh=True,
66 do_xcom_push_artifacts=["manifest.json", "run_results.json"],
67 )
68
69 process_artifacts = PythonOperator(
70 task_id="process_artifacts",
71 python_callable=process_dbt_artifacts,
72 provide_context=True,
73 )
74
75 dbt_run >> process_artifacts