airflow_dbt_python.hooks

Core dbt hooks

Provides a hook to interact with a dbt project.

class airflow_dbt_python.hooks.dbt.DbtConnectionParam(name, store_override_name=None, default=None)[source]

A tuple indicating connection parameters relevant to dbt.

Parameters:
  • name (str)

  • store_override_name (str | None)

  • default (Any | None)

name

The name of the connection parameter. This name will be used to get the parameter from an Airflow Connection or its extras.

Type:

str

store_override_name

A new name for the connection parameter. If not None, this is the name used in a dbt profiles.

Type:

str | None

default

A default value if the parameter is not found.

Type:

Any | None

default: Any | None

Alias for field number 2

name: str

Alias for field number 0

property override_name

Returns the override_name if defined, otherwise defaults to name.

>>> DbtConnectionParam("login", "user").override_name
'user'
>>> DbtConnectionParam("port").override_name
'port'
store_override_name: str | None

Alias for field number 1

class airflow_dbt_python.hooks.dbt.DbtHook(*args, dbt_conn_id='dbt_default', project_conn_id=None, profiles_conn_id=None, **kwargs)[source]

A hook to interact with dbt.

Allows for running dbt tasks and provides required configurations for each task.

Parameters:
  • dbt_conn_id (Optional[str])

  • project_conn_id (Optional[str])

  • profiles_conn_id (Optional[str])

dbt_directory(config, upload_dbt_project=False, delete_before_upload=False, replace_on_upload=False, env_vars=None)[source]

Provides a temporary directory to execute dbt.

Creates a temporary directory for dbt to run in and prepares the dbt files if they need to be pulled from S3. If a S3 backend is being used, and self.upload_dbt_project is True, before leaving the temporary directory, we push back the project to S3. Pushing back a project enables commands like deps or docs generate.

Yields:

The temporary directory’s name.

Parameters:
  • upload_dbt_project (bool)

  • delete_before_upload (bool)

  • replace_on_upload (bool)

  • env_vars (Dict[str, Any] | None)

Return type:

Iterator[str]

download_dbt_profiles(profiles_dir, destination)[source]

Pull a dbt profiles.yml file from a given profiles_dir.

This operation is delegated to a DbtRemoteHook. An optional connection id is supported for remotes that require it.

Parameters:
  • profiles_dir (URLLike)

  • destination (URLLike)

Return type:

Path

download_dbt_project(project_dir, destination)[source]

Pull a dbt project from a given project_dir.

This operation is delegated to a DbtRemoteHook. An optional connection id is supported for remotes that require it.

Parameters:
  • project_dir (URLLike)

  • destination (URLLike)

Return type:

Path

ensure_profiles(config)[source]

Ensure a profiles file exists.

Parameters:

config (BaseConfig)

get_dbt_details_from_connection(conn)[source]

Extract dbt connection details from Airflow Connection.

dbt connection details may be present as Airflow Connection attributes or in the Connection’s extras. This class’ conn_params and conn_extra_params will be used to fetch required attributes from attributes and extras respectively. If conn_extra_params is empty, we merge parameters with all extras.

Subclasses may override this class attributes to narrow down the connection details for a specific dbt target (like Postgres, or Redshift).

Parameters:

conn (Connection) – The Airflow Connection to extract dbt connection details from.

Returns:

A dictionary of dbt connection details.

Return type:

dict[str, Any]

get_dbt_target_from_connection(target)[source]

Return a dictionary of connection details to use as a dbt target.

The connection details are fetched from an Airflow connection identified by target or self.dbt_conn_id.

Parameters:

target (str | None) – The target name to use as an Airflow connection ID. If ommitted, we will use self.dbt_conn_id.

Returns:

A dictionary with a configuration for a dbt target, or None if a matching

Airflow connection is not found for given dbt target.

Return type:

dict[str, Any] | None

get_dbt_task_config(command, **config_kwargs)[source]

Initialize a configuration for given dbt command with given kwargs.

Parameters:

command (str)

Return type:

BaseConfig

get_remote(scheme, conn_id)[source]

Get a remote to interact with dbt files.

RemoteHooks are defined by the scheme we are looking for and an optional connection id if we are looking to interface with any Airflow hook that uses a connection.

Parameters:
  • scheme (str)

  • conn_id (Optional[str])

Return type:

DbtRemoteHook

prepare_directory(tmp_dir, project_dir, profiles_dir=None)[source]

Prepares a dbt directory for execution of a dbt task.

Preparation involves downloading the required dbt project files and profiles.yml.

Parameters:
  • tmp_dir (str)

  • project_dir (URLLike)

  • profiles_dir (Optional[URLLike])

Return type:

tuple[str, Optional[str]]

run_dbt_task(command, upload_dbt_project=False, delete_before_upload=False, replace_on_upload=False, artifacts=None, env_vars=None, write_perf_info=False, **kwargs)[source]

Run a dbt task with a given configuration and return the results.

The configuration used determines the task that will be ran.

Returns:

A tuple containing a boolean indicating success and optionally the results

of running the dbt command.

Parameters:
  • command (str)

  • upload_dbt_project (bool)

  • delete_before_upload (bool)

  • replace_on_upload (bool)

  • artifacts (Iterable[str] | None)

  • env_vars (Dict[str, Any] | None)

  • write_perf_info (bool)

Return type:

DbtTaskResult

setup_dbt_logging(task, debug)[source]

Setup dbt logging.

Starting with dbt v1, dbt initializes two loggers: default_file and default_stdout. As these are initialized by the CLI app, we need to initialize them here.

Parameters:
  • task (BaseTask)

  • debug (Optional[bool])

upload_dbt_project(project_dir, destination, replace=False, delete_before=False)[source]

Push a dbt project from a given project_dir.

This operation is delegated to a DbtRemoteHook. An optional connection id is supported for remotes that require it.

Parameters:
  • project_dir (URLLike)

  • destination (URLLike)

  • replace (bool)

  • delete_before (bool)

Return type:

None

class airflow_dbt_python.hooks.dbt.DbtPostgresHook(*args, dbt_conn_id='dbt_default', project_conn_id=None, profiles_conn_id=None, **kwargs)[source]

A hook to interact with dbt using a Postgres connection.

Parameters:
  • dbt_conn_id (Optional[str])

  • project_conn_id (Optional[str])

  • profiles_conn_id (Optional[str])

class airflow_dbt_python.hooks.dbt.DbtRedshiftHook(*args, dbt_conn_id='dbt_default', project_conn_id=None, profiles_conn_id=None, **kwargs)[source]

A hook to interact with dbt using a Redshift connection.

Parameters:
  • dbt_conn_id (Optional[str])

  • project_conn_id (Optional[str])

  • profiles_conn_id (Optional[str])

class airflow_dbt_python.hooks.dbt.DbtSnowflakeHook(*args, dbt_conn_id='dbt_default', project_conn_id=None, profiles_conn_id=None, **kwargs)[source]

A hook to interact with dbt using a Snowflake connection.

Parameters:
  • dbt_conn_id (Optional[str])

  • project_conn_id (Optional[str])

  • profiles_conn_id (Optional[str])

class airflow_dbt_python.hooks.dbt.DbtTaskResult(success, run_results, artifacts)[source]

A tuple returned after a dbt task executes.

Parameters:
  • success (bool)

  • run_results (Optional[RunResult])

  • artifacts (dict[str, Any])

success

Whether the task succeeded or not.

Type:

bool

run_results

Results from the dbt task, if available.

Type:

Optional[RunResult]

artifacts

A dictionary of saved dbt artifacts. It may be empty.

Type:

dict[str, Any]

artifacts: dict[str, Any]

Alias for field number 2

run_results: RunResult | None

Alias for field number 1

success: bool

Alias for field number 0

class airflow_dbt_python.hooks.dbt.DbtTemporaryDirectory(suffix=None, prefix=None, dir=None, ignore_cleanup_errors=True)[source]

A wrapper on TemporaryDirectory for older versions of Python.

Support for ignore_cleanup_errors was added in Python 3.10. There is a very obscure error that can happen when cleaning up a directory, even though everything should be cleaned. We would like to use ignore_cleanup_errors to provide clean up on a best-effort basis. For the time being, we are addressing this only for Python>=3.10.

Supported dbt remote hooks

The DbtRemoteHook interface

The DbtRemoteHook interface includes methods for downloading and uploading files.

Internally, DbtRemoteHooks can use Airflow hooks to execute the actual operations.

Currently, only AWS S3 and the local filesystem are supported as remotes.

class airflow_dbt_python.hooks.remote.DbtRemoteHook(context=None)[source]

Represents a dbt project storing any dbt files.

A concrete backend class should implement the push and pull methods to fetch one or more dbt files. Backends can rely on an Airflow connection with a corresponding hook, but this is not enforced.

Delegating the responsibility of dealing with dbt files to backend subclasses allows us to support more backends without changing the DbtHook.

connection_id

An optional Airflow connection. If defined, will be used to instantiate a hook for this backend.

abstract download(source, destination, replace=False, delete_before=False)[source]

Download source URL into local destination URL.

Parameters:
  • source (URL)

  • destination (URL)

  • replace (bool)

  • delete_before (bool)

download_dbt_profiles(source, destination)[source]

Download a dbt profiles.yml file from a given source.

Parameters:
  • source (URL | str | Path) – URLLike pointing to a remote containing a profiles.yml file.

  • destination (URL | str | Path) – URLLike to a directory where the profiles.yml will be stored.

Returns:

The destination Path.

Return type:

Path

download_dbt_project(source, destination)[source]

Download all dbt project files from a given source.

Parameters:
  • source (URL | str | Path) – URLLike to a directory containing a dbt project.

  • destination (URL | str | Path) – URLLike to a directory where the will be stored.

Returns:

The destination Path.

Return type:

Path

abstract upload(source, destination, replace=False, delete_before=False)[source]

Upload a local source URL into destination URL.

Parameters:
  • source (URL)

  • destination (URL)

  • replace (bool)

  • delete_before (bool)

upload_dbt_project(source, destination, replace=False, delete_before=False)[source]

Upload all dbt project files from a given source.

Parameters:
  • source (URL | str | Path) – URLLike to a directory containing a dbt project.

  • destination (URL | str | Path) – URLLike to a directory where the dbt project will be stored.

  • replace (bool) – Flag to indicate whether to replace existing files.

  • delete_before (bool) – Flag to indicate wheter to clear any existing files before uploading the dbt project.

airflow_dbt_python.hooks.remote.get_remote(scheme, conn_id=None)[source]

Get a DbtRemoteHook as long as the scheme is supported.

In the future we should make our hooks discoverable and package ourselves as a proper Airflow providers package.

Parameters:
  • scheme (str)

  • conn_id (str | None)

Return type:

DbtRemoteHook

dbt git remote

dbt localfs remote

A local filesystem remote for dbt.

Intended to be used only when running Airflow with a LocalExceutor.

class airflow_dbt_python.hooks.localfs.DbtLocalFsRemoteHook(fs_conn_id='fs_default')[source]

A concrete dbt remote for a local filesystem.

This remote is intended to be used when running Airflow with a LocalExecutor, and it relies on shutil from the standard library to do all the file manipulation. For these reasons, running multiple concurrent tasks with this remote may lead to race conditions if attempting to push files to the remote.

Parameters:

fs_conn_id (str)

copy(source, destination, replace=False, delete_before=False)[source]

Push all dbt files under the source directory to another local path.

Pushing supports zipped projects: the destination will be used to determine if we are working with a zip file by looking at the file extension.

Parameters:
  • source (URL) – A local file path where to fetch the files to push.

  • destination (URL) – A local path where the file should be copied.

  • replace (bool) – Whether to replace existing files or not.

  • delete_before (bool) – Whether to delete the contents of destination before pushing.

Return type:

None

copy_one(source, destination, replace=False)[source]

Pull many files from local path.

If the file already exists, it will be ignored if replace is False (the default).

Parameters:
  • source (URL) – A local path to a directory containing the files to pull.

  • destination (URL) – A destination path where to pull the file to.

  • replace (bool) – A bool flag to indicate whether to replace existing files.

Return type:

None

download(source, destination, replace=False, delete_before=False)[source]

Implement download method of dbt remote interface.

For a local filesystem, this copies the source directory or file to destination.

Parameters:
  • source (URL)

  • destination (URL)

  • replace (bool)

  • delete_before (bool)

Return type:

None

get_url(url)[source]

Return an url relative to this hook’s basepath.

If the given url is absolute, simply return the url. If it’s none, then return an url made from basepath.

Parameters:

url (URL | None)

Return type:

URL

upload(source, destination, replace=False, delete_before=False)[source]

Implement upload method of dbt remote interface.

For a local filesystem, this copies the source directory or file to destination.

Parameters:
  • source (URL)

  • destination (URL)

  • replace (bool)

  • delete_before (bool)

Return type:

None

airflow_dbt_python.hooks.localfs.py37_copytree(source, destination, replace=True)[source]

A (probably) poor attempt at replicating shutil.copytree for Python 3.7.

shutil.copytree is available in Python 3.7, however it doesn’t have the dirs_exist_ok parameter, and we really need that. If the destination path doesn’t exist, we can use shutil.copytree, however if it does then we need to copy files one by one and make any subdirectories ourselves.

Parameters:
  • source (URL)

  • destination (URL)

  • replace (bool)

dbt S3 remote

An implementation for an S3 remote for dbt.

class airflow_dbt_python.hooks.s3.DbtS3RemoteHook(*args, **kwargs)[source]

A dbt remote implementation for S3.

This concrete remote class implements the DbtRemote interface by using S3 as a storage for uploading and downloading dbt files to and from. The DbtS3RemoteHook subclasses Airflow’s S3Hook to interact with S3. A connection id may be passed to set the connection to use with S3.

download(source, destination, replace=False, delete_before=False)[source]

Download one or more files from a destination URL in S3.

Lists all S3 keys that have source as a prefix to find what to download.

Parameters:
  • source (URL) – An S3 URL to a key prefix containing objects to download.

  • destination (URL) – A destination URL where to download the objects to. The existing sub-directory hierarchy in S3 will be preserved.

  • replace (bool) – Indicates whether to replace existing files when downloading. This flag is kept here to comply with the DbtRemote interface but its ignored as files downloaded from S3 always overwrite local files.

  • delete_before (bool) – Delete destination directory before download.

download_s3_object(s3_object, destination)[source]

Download an S3 object into a local destination.

Parameters:

destination (URL)

Return type:

None

iter_url(source)[source]

Iterate over an S3 key given by a URL.

Parameters:

source (URL)

Return type:

Iterable[URL]

load_file_handle_replace_error(file_url, key, bucket_name=None, replace=False, encrypt=False, gzip=False, acl_policy=None)[source]

Calls S3Hook.load_file but handles ValueError when replacing existing keys.

Will also log a warning whenever attempting to replace an existing key with replace = False.

Returns:

True if no ValueError was raised, False otherwise.

Parameters:
  • file_url (URL | str | Path)

  • key (str)

  • bucket_name (str | None)

  • replace (bool)

  • encrypt (bool)

  • gzip (bool)

  • acl_policy (str | None)

Return type:

bool

upload(source, destination, replace=False, delete_before=False)[source]

Upload one or more files under source URL to S3.

Parameters:
  • source (URL) – A local URL where to fetch the file/s to push.

  • destination (URL) – An S3 URL where the file should be uploaded. The bucket name and key prefix will be extracted by calling S3Hook.parse_s3_url.

  • replace (bool) – Whether to replace existing S3 keys or not.

  • delete_before (bool) – Whether to delete the contents of destination before pushing.

Return type:

None