Introduction

airflow-dbt-python is a Python library that contains a collection of Airflow operators, hooks, and utilities to integrate with dbt.

Use airflow-dbt-python to run your dbt transformation pipelines end-to-end, as it offers operators for most dbt commands. Each of these airflow-dbt-python operators exposes the same arguments you would use with the dbt CLI, which makes it easy to migrate into. Moreover, take advantage of Airflow connections, to configure.

Features

airflow-dbt-python aims to make dbt a first-class citizen of Airflow by supporting additional features that integrate both tools. As you would expect, airflow-dbt-python can run all your dbt workflows in Airflow with the same results that you are used to from the CLI, but without being a mere wrapper: airflow-dbt-python directly interfaces with the internal parts of dbt-core instead of running the CLI in a, for example, BashOperator.

We believe Airflow can enhance a dbt user’s experience with several additional features that leverage Airflow as much as possible:

  • Configuring dbt connections with Airflow connections.

  • Downloading dbt projects from remote storages, like AWS S3 or Github repositories.

  • Communicate between tasks by pushing results and artifacts to XCom.

Can you think of another way Airflow can enhance dbt? Let us know in a GitHub issue!

Read along for a breakdown of airflow-dbt-python’s main features, or head over to getting_started to get your dbt workflows running in Airflow!

Download dbt files from S3

The dbt parameters profiles_dir and project_dir would normally point to a directory containing a profiles.yml file and a dbt project in the local environment respectively (defined by the presence of a dbt_project.yml file). airflow-dbt-python extends these parameters to also accept an AWS S3 URL (identified by a s3:// scheme):

  • If an S3 URL is used for profiles_dir, then this URL must point to a directory in S3 that contains a profiles.yml file. The profiles.yml file will be downloaded and made available for the operator to use when running.

  • If an S3 URL is used for project_dir, then this URL must point to a directory in S3 containing all the files required for a dbt project to run. All of the contents of this directory will be downloaded and made available for the operator. The URL may also point to a zip file containing all the files of a dbt project, which will be downloaded, uncompressed, and made available for the operator.

This feature is intended to work in line with Airflow’s description of the task concept:

Tasks don’t pass information to each other by default, and run entirely independently.

In our world, that means task should be responsible of fetching all the dbt related files it needs in order to run independently, as already described in Temporary directories ensure task independence.

Push dbt artifacts to XCom

Each dbt execution produces one or more JSON artifacts that are valuable to produce meta-metrics, build conditional workflows, for reporting purposes, and other uses. airflow-dbt-python can push these artifacts to XCom as requested via the do_xcom_push_artifacts parameter, which takes a list of artifacts to push.

This way, artifacts may be pulled and operated on by downstream tasks. For example:

example_dbt_artifacts_dag.py
 1import datetime as dt
 2
 3from airflow.operators.python import PythonOperator
 4from airflow.utils.dates import days_ago
 5from airflow_dbt_python.operators.dbt import DbtRunOperator
 6
 7with DAG(
 8    dag_id="example_dbt_artifacts",
 9    schedule_interval="0 0 * * *",
10    start_date=days_ago(1),
11    catchup=False,
12    dagrun_timeout=dt.timedelta(minutes=60),
13) as dag:
14    dbt_run = DbtRunOperator(
15        task_id="dbt_run_daily",
16        project_dir="/path/to/my/dbt/project/",
17        profiles_dir="~/.dbt/",
18        select=["+tag:daily"],
19        exclude=["tag:deprecated"],
20        target="production",
21        profile="my-project",
22        full_refresh=True,
23        do_xcom_push_artifacts=["manifest.json", "run_results.json"],
24   )
25
26   def process_dbt_artifacts(*args, **kwargs):
27       # Do processing
28       pass
29
30   process_artifacts = PythonOperator(
31       task_id="process_artifacts",
32       python_callable=process_dbt_artifacts,
33       provide_context=True,
34   )
35
36   dbt_run >> process_artifacts

Use Airflow connections as dbt targets (without a profiles.yml)

Airflow connections allow users to manage and store connection information, such as hostname, port, user name, and password, for operators to use when accessing certain applications, like databases. Similarly, a dbt profiles.yml file stores connection information under each target key.

airflow-dbt-python bridges the gap between the two and allows you to use connection information stored as an Airflow connection by specifying the connection id as the target parameter of any of the dbt operators it provides. What’s more, if using an Airflow connection, the profiles.yml file may be entirely omitted (although keep in mind a profiles.yml file contains a configuration block besides target connection information).

airflow_connection_target_dag.py
 1import datetime as dt
 2import json
 3import os
 4
 5from airflow import DAG, settings
 6from airflow.models.connection import Connection
 7from airflow.utils.dates import days_ago
 8from airflow_dbt_python.dbt.operators import DbtRunOperator
 9
10# For illustration purposes, and to keep the example self-contained, we create
11# a Connection using Airflow's ORM. However, any method of loading connections would
12# work, like Airflow's UI, Airflow's CLI, or in deployment scripts.
13my_conn = Connection(
14    conn_id="my_db_connection",
15    conn_type="postgres",
16    description="A test postgres connection",
17    host="localhost",
18    login="username",
19    port=5432,
20    schema="my_dbt_schema",
21    password="password", # pragma: allowlist secret
22    # Other dbt parameters can be added as extras
23    extra=json.dumps(dict(threads=4, sslmode="require")),
24)
25session = settings.Session()
26session.add(my_conn)
27session.commit()
28
29with DAG(
30    dag_id="example_airflow_connection",
31    schedule_interval="0 * * * *",
32    start_date=days_ago(1),
33    catchup=False,
34    dagrun_timeout=dt.timedelta(minutes=60),
35) as dag:
36dbt_run = DbtRunOperator(
37    task_id="dbt_run_hourly",
38    target="my_db_connection",
39    # Profiles file is not needed as we are using an Airflow connection.
40    # If a profiles file is used, the Airflow connection will be merged to the
41    # existing targets
42    profiles_dir=None,  # Defaults to None so this may be omitted.
43    project_dir="/path/to/my/dbt/project/",
44    select=["+tag:hourly"],
45    exclude=["tag:deprecated"],
46)