Source code for airflow_dbt_python.hooks.remote

"""The DbtRemoteHook interface includes methods for downloading and uploading files.

Internally, DbtRemoteHooks can use Airflow hooks to execute the actual operations.

Currently, only AWS S3 and the local filesystem are supported as remotes.
"""

from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional, Type

from airflow.utils.log.logging_mixin import LoggingMixin

from airflow_dbt_python.utils.url import URL, URLLike

StrPath = str


[docs] class DbtRemoteHook(ABC, LoggingMixin): """Represents a dbt project storing any dbt files. A concrete backend class should implement the push and pull methods to fetch one or more dbt files. Backends can rely on an Airflow connection with a corresponding hook, but this is not enforced. Delegating the responsibility of dealing with dbt files to backend subclasses allows us to support more backends without changing the DbtHook. Attributes: connection_id: An optional Airflow connection. If defined, will be used to instantiate a hook for this backend. """
[docs] @abstractmethod def download( self, source: URL, destination: URL, replace: bool = False, delete_before: bool = False, ): """Download source URL into local destination URL.""" return NotImplemented
[docs] @abstractmethod def upload( self, source: URL, destination: URL, replace: bool = False, delete_before: bool = False, ): """Upload a local source URL into destination URL.""" return NotImplemented
[docs] def download_dbt_project(self, source: URLLike, destination: URLLike) -> Path: """Download all dbt project files from a given source. Args: source: URLLike to a directory containing a dbt project. destination: URLLike to a directory where the will be stored. Returns: The destination Path. """ source_url = URL(source) destination_url = URL(destination) self.log.info("Downloading dbt project from %s to %s", source, destination) if source_url.is_archive(): destination_url = destination_url / source_url.name self.download(source_url, destination_url) if destination_url.exists() and destination_url.is_archive(): destination_url.extract() destination_url.unlink() destination_path = destination_url.parent.path else: destination_path = destination_url.path return destination_path
[docs] def download_dbt_profiles(self, source: URLLike, destination: URLLike) -> Path: """Download a dbt profiles.yml file from a given source. Args: source: URLLike pointing to a remote containing a profiles.yml file. destination: URLLike to a directory where the profiles.yml will be stored. Returns: The destination Path. """ source_url = URL(source) destination_url = URL(destination) self.log.info("Downloading dbt profiles from %s to %s", source, destination) if source_url.name != "profiles.yml": source_url = source_url / "profiles.yml" if destination_url.is_dir() or destination_url.name != "profiles.yml": destination_url = destination_url / "profiles.yml" self.download(source_url, destination_url) return destination_url.path
[docs] def upload_dbt_project( self, source: URLLike, destination: URLLike, replace: bool = False, delete_before: bool = False, ): """Upload all dbt project files from a given source. Args: source: URLLike to a directory containing a dbt project. destination: URLLike to a directory where the dbt project will be stored. replace: Flag to indicate whether to replace existing files. delete_before: Flag to indicate wheter to clear any existing files before uploading the dbt project. """ source_url = URL(source) destination_url = URL(destination) self.log.info("Uploading dbt project from %s to %s", source, destination) if destination_url.is_archive() and source_url.is_dir(): zip_url = source_url / destination_url.name source_url.archive(zip_url) source_url = zip_url self.upload(source_url, destination_url, replace, delete_before) if destination_url.is_archive(): source_url.unlink()
[docs] def get_remote(scheme: str, conn_id: Optional[str] = None) -> DbtRemoteHook: """Get a DbtRemoteHook as long as the scheme is supported. In the future we should make our hooks discoverable and package ourselves as a proper Airflow providers package. """ if scheme == "s3": from .s3 import DbtS3RemoteHook remote_cls: Type[DbtRemoteHook] = DbtS3RemoteHook elif scheme in ("https", "git", "git+ssh", "ssh", "http"): from .git import DbtGitRemoteHook remote_cls = DbtGitRemoteHook elif scheme == "": from .localfs import DbtLocalFsRemoteHook remote_cls = DbtLocalFsRemoteHook else: raise NotImplementedError(f"Backend {scheme} is not supported") if conn_id is not None: remote = remote_cls(conn_id) else: remote = remote_cls() return remote