Introduction
airflow-dbt-python is a Python library that contains a collection of Airflow operators, hooks, and utilities to integrate with dbt.
Use airflow-dbt-python to run your dbt transformation pipelines end-to-end, as it offers operators for most dbt commands. Each of these airflow-dbt-python operators exposes the same arguments you would use with the dbt CLI, which makes it easy to migrate into. Moreover, take advantage of Airflow connections, to configure.
Features
airflow-dbt-python aims to make dbt a first-class citizen of Airflow by supporting additional features that integrate both tools. As you would expect, airflow-dbt-python can run all your dbt workflows in Airflow with the same results that you are used to from the CLI, but without being a mere wrapper: airflow-dbt-python directly interfaces with the internal parts of dbt-core instead of running the CLI in a, for example, BashOperator
.
We believe Airflow can enhance a dbt user’s experience with several additional features that leverage Airflow as much as possible:
Configuring dbt connections with Airflow connections.
Downloading dbt projects from remote storages, like AWS S3 or Github repositories.
Communicate between tasks by pushing results and artifacts to XCom.
Can you think of another way Airflow can enhance dbt? Let us know in a GitHub issue!
Read along for a breakdown of airflow-dbt-python’s main features, or head over to getting_started to get your dbt workflows running in Airflow!
Download dbt files from S3
The dbt parameters profiles_dir
and project_dir
would normally point to a directory containing a profiles.yml
file and a dbt project in the local environment respectively (defined by the presence of a dbt_project.yml
file). airflow-dbt-python extends these parameters to also accept an AWS S3 URL (identified by a s3://
scheme):
If an S3 URL is used for
profiles_dir
, then this URL must point to a directory in S3 that contains aprofiles.yml
file. Theprofiles.yml
file will be downloaded and made available for the operator to use when running.If an S3 URL is used for
project_dir
, then this URL must point to a directory in S3 containing all the files required for a dbt project to run. All of the contents of this directory will be downloaded and made available for the operator. The URL may also point to a zip file containing all the files of a dbt project, which will be downloaded, uncompressed, and made available for the operator.
This feature is intended to work in line with Airflow’s description of the task concept:
In our world, that means task should be responsible of fetching all the dbt related files it needs in order to run independently, as already described in Temporary directories ensure task independence.
Push dbt artifacts to XCom
Each dbt execution produces one or more JSON artifacts that are valuable to produce meta-metrics, build conditional workflows, for reporting purposes, and other uses. airflow-dbt-python can push these artifacts to XCom as requested via the do_xcom_push_artifacts
parameter, which takes a list of artifacts to push.
This way, artifacts may be pulled and operated on by downstream tasks. For example:
1import datetime as dt
2
3from airflow.operators.python import PythonOperator
4from airflow.utils.dates import days_ago
5from airflow_dbt_python.operators.dbt import DbtRunOperator
6
7with DAG(
8 dag_id="example_dbt_artifacts",
9 schedule_interval="0 0 * * *",
10 start_date=days_ago(1),
11 catchup=False,
12 dagrun_timeout=dt.timedelta(minutes=60),
13) as dag:
14 dbt_run = DbtRunOperator(
15 task_id="dbt_run_daily",
16 project_dir="/path/to/my/dbt/project/",
17 profiles_dir="~/.dbt/",
18 select=["+tag:daily"],
19 exclude=["tag:deprecated"],
20 target="production",
21 profile="my-project",
22 full_refresh=True,
23 do_xcom_push_artifacts=["manifest.json", "run_results.json"],
24 )
25
26 def process_dbt_artifacts(*args, **kwargs):
27 # Do processing
28 pass
29
30 process_artifacts = PythonOperator(
31 task_id="process_artifacts",
32 python_callable=process_dbt_artifacts,
33 provide_context=True,
34 )
35
36 dbt_run >> process_artifacts
Use Airflow connections as dbt targets (without a profiles.yml)
Airflow connections allow users to manage and store connection information, such as hostname, port, user name, and password, for operators to use when accessing certain applications, like databases. Similarly, a dbt profiles.yml
file stores connection information under each target key.
airflow-dbt-python
bridges the gap between the two and allows you to use connection information stored as an Airflow connection by specifying the connection id as the target
parameter of any of the dbt operators it provides. What’s more, if using an Airflow connection, the profiles.yml
file may be entirely omitted (although keep in mind a profiles.yml
file contains a configuration block besides target connection information).
1import datetime as dt
2import json
3import os
4
5from airflow import DAG, settings
6from airflow.models.connection import Connection
7from airflow.utils.dates import days_ago
8from airflow_dbt_python.dbt.operators import DbtRunOperator
9
10# For illustration purposes, and to keep the example self-contained, we create
11# a Connection using Airflow's ORM. However, any method of loading connections would
12# work, like Airflow's UI, Airflow's CLI, or in deployment scripts.
13my_conn = Connection(
14 conn_id="my_db_connection",
15 conn_type="postgres",
16 description="A test postgres connection",
17 host="localhost",
18 login="username",
19 port=5432,
20 schema="my_dbt_schema",
21 password="password", # pragma: allowlist secret
22 # Other dbt parameters can be added as extras
23 extra=json.dumps(dict(threads=4, sslmode="require")),
24)
25session = settings.Session()
26session.add(my_conn)
27session.commit()
28
29with DAG(
30 dag_id="example_airflow_connection",
31 schedule_interval="0 * * * *",
32 start_date=days_ago(1),
33 catchup=False,
34 dagrun_timeout=dt.timedelta(minutes=60),
35) as dag:
36dbt_run = DbtRunOperator(
37 task_id="dbt_run_hourly",
38 target="my_db_connection",
39 # Profiles file is not needed as we are using an Airflow connection.
40 # If a profiles file is used, the Airflow connection will be merged to the
41 # existing targets
42 profiles_dir=None, # Defaults to None so this may be omitted.
43 project_dir="/path/to/my/dbt/project/",
44 select=["+tag:hourly"],
45 exclude=["tag:deprecated"],
46)