Getting started

This section guides you on installing airflow-dbt-python and getting your first dbt workflow DAG running.

Requirements

Before using airflow-dbt-python, ensure you meet the following requirements: * A dbt project using dbt-core version 1.0.0 or later. * An Airflow environment using version 2.2 or later.

  • If using any managed service, like AWS MWAA, ensure your environment is created with a supported version of Airflow.

  • If self-hosting, Airflow installation instructions can be found in their official documentation.

  • Running Python 3.7 or later in your Airflow environment.

Warning

Even though we don’t impose any upper limits on versions of Airflow and dbt, it’s possible that new versions are not supported immediately after release, particularly for dbt. We recommend testing the latest versions before upgrading and reporting any issues.

Note

Older versions of Airflow and dbt may work with airflow-dbt-python, although we cannot guarantee this. Our testing pipeline runs the latest dbt-core with the latest Airflow release, and the latest version supported by AWS MWAA.

Installation

Your installation will vary according to your specific Airflow environment setup. These instructions cover a general approach by installing from PyPI or the GitHub repository, and how to install it in AWS MWAA. Other serviced offerings may require different steps, check the documentation of your managed service.

From PyPI

airflow-dbt-python is available in PyPI and can be installed with pip:

pip install airflow-dbt-python

As a convenience, some dbt adapters can be installed by specifying extras. For example, if requiring the dbt-redshift adapter:

pip install airflow-dbt-python[redshift]

Or dbt-snowflake:

pip install airflow-dbt-python[snowflake]

Note

These dbt adapter extras are provided as a convenience. Any required dbt adapters can also be installed separatedly. Refer to the dbt documentation for a list of supported adapters and how to install them.

Building from source

airflow-dbt-python can also be built from source by cloning the GitHub repository:

git clone https://github.com/tomasfarias/airflow-dbt-python.git
cd airflow-dbt-python

And installing with Poetry:

poetry install

Any extra dbt adapters can be installed by specifying extras:

poetry install -E postgres -E redshift -E bigquery -E snowflake
poetry install -E all

Installing in MWAA

airflow-dbt-python can be installed in an Airflow environment managed by AWS via their Managed Workflows for Apache Airflow service.

To do so, include airflow-dbt-python in the requirements.txt file provided to MWAA, for example:

requirements.txt
airflow-dbt-python[redshift,s3]

Installs airflow-dbt-python, dbt-redshift adapter, and all required libraries to support dbt S3 remotes.

Alternatively, airflow-dbt-python can also be provided to AWS MWAA via a plugins.zip file. This can be achieved by adding an airflow-dbt-python wheel to your plugins.zip

For example, we can start by cloning the GitHub repository:

git clone https://github.com/tomasfarias/airflow-dbt-python.git
cd airflow-dbt-python

Then building an airflow-dbt-python wheel using poetry:

poetry build -f wheel

The wheel file can now be added to your plugins.zip, and the requirements can be updated to point to this wheel file:

requirements.txt
/usr/local/airflow/plugins/airflow_dbt_python-1.0.0-py3-none-any.whl

Accessing a dbt project

airflow-dbt-python needs a way to access your dbt project to run. The requirements to grant this access will depend on how your Airflow environment is setup:

  1. Using a local executor with a single-machine installation means we can rely on the local machine’s filesystem to store a dbt project. This also applies to DebugExecutor and SequentialExecutor, but these executors are generally only used for debugging/development so we will ignore them. If you are running a setup like this, then simply ensure your dbt project and profiles.yml exist somewhere in the LocalExecutor’s file system.

  2. Once your setup has evolved to a multi-machine/cloud installation with any remote executor, we must rely on a remote storage for dbt files. Currently, supported remote storages include AWS S3 and git remote repositories although more are in plans to be added. In this setup, your dbt project will need to be uploaded to a remote storage that Airflow can access. airflow-dbt-python can utilize Airflow connections to access these storages.

Single-machine installation

As we can rely on the local machine’s filesystem, simply copy or move your dbt project profiles.yml to a path in the instance executing Airflow.

Files may be laid out as:

.
|-- ~/.dbt/
|   `-- profiles.yml
`-- /path/to/project/
    |-- dbt_project.yml
    |-- models/
    |   |-- model1.sql
    |   `-- model2.sql
    |-- seeds/
    |   |-- seed1.csv
    |   `-- seed2.csv
    |-- macros/
    |   |-- macro1.csv
    |   `-- macro2.csv
    `-- tests/
        |-- test1.sql
        `-- test2.sql

Then we can simply set project_dir and profiles_dir to "/path/to/project/" and "~/.dbt/" respectively:

example_local_1_dag.py
 1import datetime as dt
 2
 3import pendulum
 4from airflow import DAG
 5from airflow_dbt_python.operators.dbt import DbtRunOperator
 6
 7with DAG(
 8    dag_id="example_local_1",
 9    schedule_interval="0 0 * * *",
10    start_date=pendulum.today("UTC").add(days=-1),
11    catchup=False,
12    dagrun_timeout=dt.timedelta(minutes=60),
13) as dag:
14    dbt_run = DbtRunOperator(
15        task_id="dbt_run_daily",
16        project_dir="/path/to/project",
17        profiles_dir="~/.dbt/",
18        select=["+tag:daily"],
19        exclude=["tag:deprecated"],
20        target="production",
21        profile="my-project",
22   )

Note

Setting profiles_dir to "~/.dbt/" can be omitted as this is the default value.

If we have multiple operators, we can also utilize default arguments and include other parameters like the profile and target to use:

example_local_2_dag.py
 1import datetime as dt
 2
 3import pendulum
 4from airflow import DAG
 5from airflow_dbt_python.operators.dbt import DbtRunOperator, DbtSeedOperator
 6
 7default_args = {
 8   "project_dir": "/path/to/project/",
 9   "profiles_dir": "~/.dbt/",
10   "target": "production",
11   "profile": "my-project",
12}
13
14with DAG(
15    dag_id="example_local_2",
16    schedule_interval="0 0 * * *",
17    start_date=pendulum.today("UTC").add(days=-1),
18    catchup=False,
19    dagrun_timeout=dt.timedelta(minutes=60),
20    default_args=default_args,
21) as dag:
22    dbt_seed = DbtSeedOperator(
23        task_id="dbt_seed",
24    )
25
26    dbt_run = DbtRunOperator(
27        task_id="dbt_run_daily",
28        select=["+tag:daily"],
29        exclude=["tag:deprecated"],
30    )
31
32    dbt_seed >> dbt_run

Note

dbt supports configuration via environment variables, which may also be used. Additionally, profile and target may be omitted if already specified in dbt_project.yml and profiles.yml respectively.

Multi-machine/cloud installation

When Airflow is installed is running on a multi- machine or cloud installation, each individual worker does not have does not have access to a common filesystem that we can reliably use to store dbt project files (at least, assuming any deployment with more than one worker). This includes both self-hosted deployments as well as managed Airflow deployments like AWS MWAA or Astronomer.

For these deployments we must rely on a dbt remote to download and, eventually, upload all required dbt files. The remote dbt URL may be used in place of a local project_dir or profiles_dir to have airflow-dbt-python download the dbt files in the remote into a temporary directory for execution.

Interactions with storages are supported by subclasses of DbtRemoteHook. Read the documentation dbt remote hooks to learn more about these hooks.

As an example, let’s upload our dbt project to an AWS S3 bucket. The files may end up structured in the bucket as:

s3://my-bucket/
.
|-- profiles/
|   `-- profiles.yml
`-- project/
    |-- dbt_project.yml
    |-- models/
    |   |-- model1.sql
    |   `-- model2.sql
    |-- seeds/
    |   |-- seed1.csv
    |   `-- seed2.csv
    |-- macros/
    |   |-- macro1.csv
    |   `-- macro2.csv
    `-- tests/
        |-- test1.sql
        `-- test2.sql

Then, we can alter the previous example DAG to set project_dir and profiles_dir to "s3://my-bucket/project/" and "s3://my-bucket/profiles/" respectively:

example_s3_remote_1_dag.py
 1import datetime as dt
 2
 3import pendulum
 4from airflow import DAG
 5from airflow_dbt_python.operators.dbt import DbtRunOperator
 6
 7with DAG(
 8    dag_id="example_s3_remote_1",
 9    schedule_interval="0 0 * * *",
10    start_date=pendulum.today("UTC").add(days=-1),
11    catchup=False,
12    dagrun_timeout=dt.timedelta(minutes=60),
13) as dag:
14    dbt_run = DbtRunOperator(
15        task_id="dbt_run_daily",
16        project_dir="s3://my-bucket/project/",
17        profiles_dir="s3://my-bucket/profiles/",
18        select=["+tag:daily"],
19        exclude=["tag:deprecated"],
20        target="production",
21        profile="my-project",
22   )

airflow-dbt-python uses the URL scheme (in this example, "s3") to figure out the type of remote, and the corresponding DbtRemoteHook to download all required files. An exception would be raised if the scheme does not point to a supported remote.

airflow-dbt-python takes care of adjusting any path-like arguments so that they are pointing to files in a local temporary directory once all the dbt files are download from the remote storage.

Let’s do another example where we upload our dbt project to a GitHub repository. For this example, let’s use dbt-labs’ own jaffle_shop.

The DAG looks the same as the AWS S3 example, except that now we use the GitHub repository’s SSH URL as the project_dir argument:

example_git_remote_1_dag.py
 1import datetime as dt
 2
 3import pendulum
 4from airflow import DAG
 5from airflow_dbt_python.operators.dbt import DbtRunOperator
 6
 7with DAG(
 8    dag_id="example_git_remote_1",
 9    schedule_interval="0 0 * * *",
10    start_date=pendulum.today("UTC").add(days=-1),
11    catchup=False,
12    dagrun_timeout=dt.timedelta(minutes=60),
13) as dag:
14    dbt_run = DbtRunOperator(
15        task_id="dbt_run_daily",
16        project_dir="git+ssh://github.com:dbt-labs/jaffle-shop-classic",
17        select=["+tag:daily"],
18        exclude=["tag:deprecated"],
19        target="my_warehouse_connection",
20        profile="my-project",
21   )

airflow-dbt-python can determine this URL requires a DbtGitRemoteHook by looking at the URL’s scheme ("git+ssh"). As we are passing an SSH URL, DbtGitRemoteHook can utilize an Airflow SSH Connection as it subclasses Airflow’s SSHHook. This connection type allows us to setup the necessary SSH keys to access GitHub. Of course, as this is a public repository, we could have just used an HTTP URL, but for private repositories an SSH key may be required.

Note

airflow-dbt-python can utilize Airflow Connections to fetch connection details for dbt remotes as well as for dbt targets (e.g. for your data warehouse). The project_conn_id and profiles_conn_id arguments that all dbt operators have refer to Airflow Connections to used to fetch dbt projects and profiles.yml respectively, whereas the target argument can point to an Airflow Connection used to setup dbt to access your data warehouse.

Notice we are omitting the profiles_dir argument as the jaffle_shop repo doesn’t include a profiles.yml file we can use. When we omit profiles_dir, airflow-dbt-python will attempt to find dbt connection details in one of two places:

  1. First, it will check if the project_dir URL already includes a profiles.yml. If so, we can use it.

  2. If it’s not included, airflow-dbt-python will try to find an Airflow Connection using the target argument.

Airflow Connections are generally created in the UI, but for illustration purposes we can create one also in our DAG with:

example_git_remote_1_dag.py
 1from airflow import DAG, settings
 2from airflow.models.connection import Connection
 3
 4session = settings.Session()
 5my_conn = Connection(
 6    conn_id="my_db_connection",
 7    conn_type="postgres",
 8    description="A test postgres connection",
 9    host="localhost",
10    login="username",
11    port=5432,
12    schema="my_dbt_schema",
13    password="password",
14    # Other dbt parameters can be added as extras
15    extra=json.dumps(dict(threads=4, sslmode="require")),
16)
17
18session.add(my_conn)
19session.commit()