Example DAGs

This section contains a few DAGs showing off some dbt pipelines to get you going.

Warning

All example DAGs are tested against against apache-airflow==2.2.5. Some changes, like modifying import statements or changing types, may be required for them to work in environments running other versions of Airflow.

Basic DAG

This basic DAG shows off a single DbtRunOperator that executes daily:

basic_dag.py
 1"""Sample basic DAG which dbt runs a project."""
 2import datetime as dt
 3
 4from airflow import DAG
 5from airflow.utils.dates import days_ago
 6from airflow_dbt_python.operators.dbt import DbtRunOperator
 7
 8with DAG(
 9    dag_id="example_basic_dbt_run",
10    schedule_interval="0 * * * *",
11    start_date=days_ago(1),
12    catchup=False,
13    dagrun_timeout=dt.timedelta(minutes=60),
14) as dag:
15    dbt_run = DbtRunOperator(
16        task_id="dbt_run_hourly",
17        project_dir="/path/to/my/dbt/project/",
18        profiles_dir="~/.dbt/",
19        select=["+tag:hourly"],
20        exclude=["tag:deprecated"],
21        target="production",
22        profile="my-project",
23        full_refresh=False,
24    )

Run and Docs from S3

This DAG shows off a DbtRunOperator followed by a DbtDocsGenerateOperator. Both execute daily, and run from dbt project files available in an S3 URL:

dbt_project_in_s3_dag.py
 1"""Sample basic DAG which showcases a dbt project being pulled from S3."""
 2import datetime as dt
 3
 4from airflow import DAG
 5from airflow.utils.dates import days_ago
 6from airflow_dbt_python.operators.dbt import DbtDocsGenerateOperator, DbtRunOperator
 7
 8with DAG(
 9    dag_id="example_basic_dbt_run_with_s3",
10    schedule_interval="0 * * * *",
11    start_date=days_ago(1),
12    catchup=False,
13    dagrun_timeout=dt.timedelta(minutes=60),
14) as dag:
15    # Project files will be pulled from "s3://my-bucket/dbt/profiles/key/prefix/"
16    dbt_run = DbtRunOperator(
17        task_id="dbt_run_hourly",
18        project_dir="s3://my-bucket/dbt/project/key/prefix/",
19        profiles_dir="s3://my-bucket/dbt/profiles/key/prefix/",
20        select=["+tag:hourly"],
21        exclude=["tag:deprecated"],
22        target="production",
23        profile="my-project",
24        full_refresh=False,
25    )
26
27    # Documentation files (target/manifest.json, target/index.html, and
28    # target/catalog.json) will be pushed back to S3 after compilation is done.
29    dbt_docs = DbtDocsGenerateOperator(
30        task_id="dbt_run_hourly",
31        project_dir="s3://my-bucket/dbt/project/key/prefix/",
32        profiles_dir="s3://my-bucket/dbt/profiles/key/prefix/",
33    )
34
35    dbt_run >> dbt_docs

Complete dbt workflow

This DAG shows off a (almost) complete dbt workflow as it would be run from the CLI: we begin by running DbtSourceOperator to test the freshness of our source tables, DbtSeedOperator follows to load up any static data. Then, two instances of DbtRunOperator are created: one to handle incremental data, and the other one to run any non-incremental models. Finally, we run our tests to ensure our models remain correct.

complete_dbt_workflow_dag.py
 1"""Sample DAG showcasing a complete dbt workflow.
 2
 3The complete workflow includes a sequence of source, seed, and several run commands.
 4"""
 5import datetime as dt
 6
 7from airflow import DAG
 8from airflow.utils.dates import days_ago
 9from airflow_dbt_python.operators.dbt import (
10    DbtRunOperator,
11    DbtSeedOperator,
12    DbtSourceOperator,
13    DbtTestOperator,
14)
15
16with DAG(
17    dag_id="example_complete_dbt_workflow",
18    schedule_interval="0 * * * *",
19    start_date=days_ago(1),
20    catchup=False,
21    dagrun_timeout=dt.timedelta(minutes=60),
22) as dag:
23    dbt_source = DbtSourceOperator(
24        task_id="dbt_run_incremental_hourly",
25        project_dir="/path/to/my/dbt/project/",
26        profiles_dir="~/.dbt/",
27        target="production",
28        profile="my-project",
29        do_xcom_push_artifacts=["sources.json"],
30    )
31
32    dbt_seed = DbtSeedOperator(
33        task_id="dbt_seed",
34        project_dir="/path/to/my/dbt/project/",
35        profiles_dir="~/.dbt/",
36        target="production",
37        profile="my-project",
38    )
39
40    dbt_run_incremental = DbtRunOperator(
41        task_id="dbt_run_incremental_hourly",
42        project_dir="/path/to/my/dbt/project/",
43        profiles_dir="~/.dbt/",
44        select=["tag:hourly,config.materialized:incremental"],
45        exclude=["tag:deprecated"],
46        target="production",
47        profile="my-project",
48        full_refresh=False,
49    )
50
51    dbt_run = DbtRunOperator(
52        task_id="dbt_run_hourly",
53        project_dir="/path/to/my/dbt/project/",
54        profiles_dir="~/.dbt/",
55        select=["+tag:hourly"],
56        exclude=["tag:deprecated,config.materialized:incremental"],
57        target="production",
58        profile="my-project",
59        full_refresh=True,
60    )
61
62    dbt_test = DbtTestOperator(
63        task_id="dbt_test",
64        project_dir="/path/to/my/dbt/project/",
65        profiles_dir="~/.dbt/",
66        target="production",
67        profile="my-project",
68    )
69
70    dbt_source >> dbt_seed >> dbt_run_incremental >> dbt_run >> dbt_test

Using dbt artifacts

The following DAG showcases how to use dbt artifacts that are made available via XCom by airflow-dbt-python. A sample function calculates the longest running dbt model by pulling the artifacts that were generated after DbtRunOperator executes. We specify which dbt artifacts via the do_xcom_push_artifacts parameter.

use_dbt_artifacts_dag.py
 1"""Sample DAG to showcase pulling dbt artifacts from XCOM."""
 2import datetime as dt
 3
 4from airflow import DAG
 5from airflow.operators.python_operator import PythonOperator
 6from airflow.utils.dates import days_ago
 7from airflow_dbt_python.operators.dbt import DbtRunOperator
 8
 9
10def process_dbt_artifacts(**context):
11    """Report which model or models took the longest to compile and execute."""
12    run_results = context["ti"].xcom_pull(
13        key="run_results.json", task_ids="dbt_run_daily"
14    )
15    longest_compile = None
16    longest_execute = None
17
18    for result in run_results["results"]:
19        if result["status"] != "success":
20            continue
21
22    model_id = result["unique_id"]
23    for timing in result["timing"]:
24        duration = (
25            dt.datetime.strptime(
26                timing["started_at"], format="%Y-%m-%dT%H:%M:%S.%fZ"
27            )
28            - dt.datetime.strptime(
29                timing["completed_at"], format="%Y-%m-%dT%H:%M:%S.%fZ"
30            )
31        ).total_seconds()
32
33        if timing["name"] == "execute":
34            if longest_execute is None or duration > longest_execute[1]:
35                longest_execute = (model_id, duration)
36
37            elif timing["name"] == "compile":
38                if longest_compile is None or duration > longest_compile[1]:
39                    longest_compile = (model_id, duration)
40
41    print(
42        f"{longest_execute[0]} took the longest to execute with a time of "
43        f"{longest_execute[1]} seconds!"
44    )
45    print(
46        f"{longest_compile[0]} took the longest to compile with a time of "
47        f"{longest_compile[1]} seconds!"
48    )
49
50with DAG(
51    dag_id="example_dbt_artifacts",
52    schedule_interval="0 0 * * *",
53    start_date=days_ago(1),
54    catchup=False,
55    dagrun_timeout=dt.timedelta(minutes=60),
56) as dag:
57    dbt_run = DbtRunOperator(
58        task_id="dbt_run_daily",
59        project_dir="/path/to/my/dbt/project/",
60        profiles_dir="~/.dbt/",
61        select=["+tag:daily"],
62        exclude=["tag:deprecated"],
63        target="production",
64        profile="my-project",
65        full_refresh=True,
66        do_xcom_push_artifacts=["manifest.json", "run_results.json"],
67    )
68
69    process_artifacts = PythonOperator(
70        task_id="process_artifacts",
71        python_callable=process_dbt_artifacts,
72        provide_context=True,
73    )
74
75   dbt_run >> process_artifacts