airflow.providers.dbt.cloud.hooks.dbt

属性

DBT_CAUSE_MAX_LENGTH

T

异常

DbtCloudJobRunException

表示作业运行未能完成的异常。

DbtCloudResourceLookupError

当无法唯一标识 dbt Cloud 资源时引发的异常。

TokenAuth

执行请求时用于身份验证的辅助类。

JobRunInfo

用于 job_run_info 字典的类型类。

DbtCloudJobRunStatus

dbt Cloud 作业状态。

DbtCloudHook

使用 V2(如果支持则使用 V3)API 与 dbt Cloud 交互。

函数

fallback_to_default_account(func)

account_id 提供回退值。

provide_account_id(func)

account_id 提供回退值。

模块内容

airflow.providers.dbt.cloud.hooks.dbt.DBT_CAUSE_MAX_LENGTH = 255[source]
airflow.providers.dbt.cloud.hooks.dbt.fallback_to_default_account(func)[source]

account_id 提供回退值。

如果 account_id 为 None 或未传递给被装饰的函数,该值将取自配置的 dbt Cloud Airflow 连接。

class airflow.providers.dbt.cloud.hooks.dbt.TokenAuth(token)[source]

基类: requests.auth.AuthBase

执行请求时用于身份验证的辅助类。

token[source]
__call__(request)[source]
class airflow.providers.dbt.cloud.hooks.dbt.JobRunInfo[source]

基类: TypedDict

用于 job_run_info 字典的类型类。

account_id: int | None[source]
run_id: int[source]
class airflow.providers.dbt.cloud.hooks.dbt.DbtCloudJobRunStatus[source]

基类: enum.Enum

dbt Cloud 作业状态。

QUEUED = 1[source]
STARTING = 2[source]
RUNNING = 3[source]
SUCCESS = 10[source]
ERROR = 20[source]
CANCELLED = 30[source]
NON_TERMINAL_STATUSES[source]
TERMINAL_STATUSES[source]
classmethod check_is_valid(statuses)[source]

验证输入状态为已知值。

classmethod is_terminal(status)[source]

检查输入状态是否为终端类型。

exception airflow.providers.dbt.cloud.hooks.dbt.DbtCloudJobRunException[source]

基类: airflow.exceptions.AirflowException

表示作业运行未能完成的异常。

exception airflow.providers.dbt.cloud.hooks.dbt.DbtCloudResourceLookupError[source]

基类: airflow.exceptions.AirflowException

当无法唯一标识 dbt Cloud 资源时引发的异常。

airflow.providers.dbt.cloud.hooks.dbt.T[source]
airflow.providers.dbt.cloud.hooks.dbt.provide_account_id(func)[source]

account_id 提供回退值。

如果 account_id 为 None 或未传递给被装饰的函数,该值将取自配置的 dbt Cloud Airflow 连接。

class airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook(dbt_cloud_conn_id=default_conn_name, *args, **kwargs)[source]

基类: airflow.providers.http.hooks.http.HttpHook

使用 V2(如果支持则使用 V3)API 与 dbt Cloud 交互。

参数:

dbt_cloud_conn_id (str) – dbt Cloud 连接的 ID。

conn_name_attr = 'dbt_cloud_conn_id'[source]
default_conn_name = 'dbt_cloud_default'[source]
conn_type = 'dbt_cloud'[source]
hook_name = 'dbt Cloud'[source]
classmethod get_ui_field_behaviour()[source]

为 Airflow UI 中的 dbt Cloud 连接表单构建自定义字段行为。

dbt_cloud_conn_id = 'dbt_cloud_default'[source]
static get_request_url_params(tenant, endpoint, include_related=None, *, api_version='v2')[source]

从基本 URL 和端点 URL 构成 URL。

参数:
  • tenant (str) – 需要在基本 URL 中替换的租户域名。

  • endpoint (str) – 要请求的端点 URL。

  • include_related (list[str] | None) – 可选。与运行一起获取的相关字段列表。有效值包括 “trigger”、“job”、“repository” 和 “environment”。

async get_headers_tenants_from_connection()[source]

从连接详情获取 Headers 和租户信息。

async get_job_details(run_id, account_id=None, include_related=None)[source]

使用异步 HTTP 调用检索特定 dbt Cloud 作业运行的元数据。

参数:
  • run_id (int) – dbt Cloud 作业运行的 ID。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

  • include_related (list[str] | None) – 可选。与运行一起获取的相关字段列表。有效值包括 “trigger”、“job”、“repository” 和 “environment”。

async get_job_status(run_id, account_id=None, include_related=None)[source]

检索特定 dbt Cloud 作业运行的状态。

参数:
  • run_id (int) – dbt Cloud 作业运行的 ID。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

  • include_related (list[str] | None) – 可选。与运行一起获取的相关字段列表。有效值包括 “trigger”、“job”、“repository” 和 “environment”。

property connection: airflow.models.Connection[source]
get_conn(*args, **kwargs)[source]

创建一个 Requests HTTP 会话。

参数:
  • headers – 作为字典传递的额外 Headers。

  • extra_options – 执行请求时使用的额外选项

返回:

一个配置好的 requests.Session 对象。

返回类型:

requests.sessions.Session

list_accounts()[source]

检索配置的 API 令牌授权访问的所有 dbt Cloud 账户。

返回:

请求响应列表。

返回类型:

list[requests.models.Response]

get_account(account_id=None)[source]

检索特定 dbt Cloud 账户的元数据。

参数:

account_id (int | None) – 可选。dbt Cloud 账户的 ID。

返回:

请求响应。

返回类型:

requests.models.Response

list_projects(account_id=None, name_contains=None)[source]

检索与指定 dbt Cloud 账户关联的所有项目的元数据。

参数:
  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

  • name_contains (str | None) – 可选。用于过滤 dbt Cloud 项目名称的不区分大小写的子字符串。

返回:

请求响应列表。

返回类型:

list[requests.models.Response]

get_project(project_id, account_id=None)[source]

检索特定项目的元数据。

参数:
  • project_id (int) – dbt Cloud 项目的 ID。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

返回:

请求响应。

返回类型:

requests.models.Response

list_environments(project_id, *, name_contains=None, account_id=None)[source]

检索与指定 dbt Cloud 项目关联的所有环境的元数据。

参数:
  • project_id (int) – dbt Cloud 项目的 ID。

  • name_contains (str | None) – 可选。用于过滤 dbt Cloud 环境名称的不区分大小写的子字符串。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

返回:

请求响应列表。

返回类型:

list[requests.models.Response]

get_environment(project_id, environment_id, *, account_id=None)[source]

检索特定项目环境的元数据。

参数:
  • project_id (int) – dbt Cloud 项目的 ID。

  • environment_id (int) – dbt Cloud 环境的 ID。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

返回:

请求响应。

返回类型:

requests.models.Response

list_jobs(account_id=None, order_by=None, project_id=None, environment_id=None, name_contains=None)[source]

检索与指定 dbt Cloud 账户关联的所有作业的元数据。

如果提供了 project_id,将仅检索与此项目相关的作业。如果提供了 environment_id,将仅检索与此环境相关的作业。

参数:
  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

  • order_by (str | None) – 可选。用于排序结果的字段。使用 ‘-’ 表示反向排序。例如,要按运行 ID 反向排序,请使用 order_by=-id

  • project_id (int | None) – 可选。dbt Cloud 项目的 ID。

  • environment_id (int | None) – 可选。dbt Cloud 环境的 ID。

  • name_contains (str | None) – 可选。用于过滤 dbt Cloud 作业名称的不区分大小写的子字符串。

返回:

请求响应列表。

返回类型:

list[requests.models.Response]

get_job(job_id, account_id=None)[source]

检索特定作业的元数据。

参数:
  • job_id (int) – dbt Cloud 作业的 ID。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

返回:

请求响应。

返回类型:

requests.models.Response

get_job_by_name(*, project_name, environment_name, job_name, account_id=None)[source]

通过项目、环境和作业名称的组合检索特定作业的元数据。

如果找不到作业或无法通过提供的参数唯一标识作业,则引发 DbtCloudResourceLookupError 异常。

参数:
  • project_name (str) – dbt Cloud 项目的名称。

  • environment_name (str) – dbt Cloud 环境的名称。

  • job_name (str) – dbt Cloud 作业的名称。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

返回:

作业的详情。

返回类型:

dict

trigger_job_run(job_id, cause, account_id=None, steps_override=None, schema_override=None, retry_from_failure=False, additional_run_config=None)[source]

触发一个 dbt Cloud 作业的运行。

参数:
  • job_id (int) – dbt Cloud 作业的 ID。

  • cause (str) – 描述触发作业的原因。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

  • steps_override (list[str] | None) – 可选。触发作业时要执行的 dbt 命令列表,这些命令将覆盖在 dbt Cloud 中配置的命令。

  • schema_override (str | None) – 可选。覆盖此作业中配置目标位置的目标 Schema。

  • retry_from_failure (bool) – 可选。如果设置为 True 并且前一次作业运行失败,将使用“重跑”端点触发作业。此参数不能与 steps_override、schema_override 或 additional_run_config 同时使用。

  • additional_run_config (dict[str, Any] | None) – 可选。触发作业时 API 请求中应包含的任何附加参数。

返回:

请求响应。

返回类型:

requests.models.Response

list_job_runs(account_id=None, include_related=None, job_definition_id=None, order_by=None)[source]

检索某个账号下所有 dbt Cloud 作业运行的元数据。

如果提供了 job_definition_id,则只拉取该特定作业运行的元数据。

参数:
  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

  • include_related (list[str] | None) – 可选。与运行一起获取的相关字段列表。有效值包括 “trigger”、“job”、“repository” 和 “environment”。

  • job_definition_id (int | None) – 可选。要检索运行元数据的 dbt Cloud 作业 ID。

  • order_by (str | None) – 可选。用于排序结果的字段。使用 ‘-’ 表示反向排序。例如,要按运行 ID 反向排序,请使用 order_by=-id

返回:

请求响应列表。

返回类型:

list[requests.models.Response]

get_job_runs(account_id=None, payload=None)[source]

检索某个 dbt Cloud 作业特定运行的元数据。

参数:
  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

  • paylod – 可选。查询参数

返回:

请求响应。

返回类型:

requests.models.Response

get_job_run(run_id, account_id=None, include_related=None)[source]

检索某个 dbt Cloud 作业特定运行的元数据。

参数:
  • run_id (int) – dbt Cloud 作业运行的 ID。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

  • include_related (list[str] | None) – 可选。与运行一起获取的相关字段列表。有效值包括 “trigger”、“job”、“repository” 和 “environment”。

返回:

请求响应。

返回类型:

requests.models.Response

get_job_run_status(run_id, account_id=None)[source]

检索特定 dbt Cloud 作业运行的状态。

参数:
  • run_id (int) – dbt Cloud 作业运行的 ID。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

返回:

dbt Cloud 作业运行的状态。

返回类型:

int

wait_for_job_run_status(run_id, account_id=None, expected_statuses=DbtCloudJobRunStatus.SUCCESS.value, check_interval=60, timeout=60 * 60 * 24 * 7)[source]

等待 dbt Cloud 作业运行达到预期状态。

参数:
  • run_id (int) – dbt Cloud 作业运行的 ID。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

  • expected_statuses (int | collections.abc.Sequence[int] | set[int]) – 可选。要与作业运行当前状态进行比较的期望状态值。默认为成功状态值。

  • check_interval (int) – 检查流水线运行状态的时间间隔(秒)。

  • timeout (int) – 等待流水线达到终止状态或预期状态的超时时间(秒)。

返回:

一个布尔值,指示作业运行是否已达到 expected_status 状态。

返回类型:

bool

cancel_job_run(run_id, account_id=None)[source]

取消特定的 dbt Cloud 作业运行。

参数:
  • run_id (int) – dbt Cloud 作业运行的 ID。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

list_job_run_artifacts(run_id, account_id=None, step=None)[source]

检索已完成的 dbt Cloud 作业运行生成的可用工件文件列表。

默认情况下,此方法返回运行中最后一步的工件。要列出运行中其他步骤的工件,请使用 step 参数。

参数:
  • run_id (int) – dbt Cloud 作业运行的 ID。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

  • step (int | None) – 可选。要查询工件的运行中步骤的索引。运行中的第一个步骤索引为 1。如果省略 step 参数,将返回运行中最后一步的工件。

返回:

请求响应列表。

返回类型:

list[requests.models.Response]

get_job_run_artifact(run_id, path, account_id=None, step=None)[source]

检索已完成的 dbt Cloud 作业运行生成的可用工件文件列表。

默认情况下,此方法返回运行中最后一步的工件。要列出运行中其他步骤的工件,请使用 step 参数。

参数:
  • run_id (int) – dbt Cloud 作业运行的 ID。

  • path (str) – 与工件文件相关的路径。路径根目录为 target/。使用“manifest.json”、“catalog.json”或“run_results.json”可下载此运行生成的 dbt 工件。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

  • step (int | None) – 可选。要查询工件的运行中步骤的索引。运行中的第一个步骤索引为 1。如果省略 step 参数,将返回运行中最后一步的工件。

返回:

请求响应。

返回类型:

requests.models.Response

async get_job_run_artifacts_concurrently(run_id, artifacts, account_id=None, step=None)[source]

检索已完成的 dbt Cloud 作业运行中某个步骤生成的选定工件文件列表。

默认情况下,此方法返回运行中最后一步的工件。此方法利用异步调用来加快检索速度。

参数:
  • run_id (int) – dbt Cloud 作业运行的 ID。

  • step (int | None) – 要查询工件的运行中步骤的索引。运行中的第一个步骤索引为 1。如果省略 step 参数,将返回运行中最后一步的工件。

  • path – 与工件文件相关的路径。路径根目录为 target/。使用“manifest.json”、“catalog.json”或“run_results.json”可下载此运行生成的 dbt 工件。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

返回:

请求响应。

retry_failed_job_run(job_id, account_id=None)[source]

如果作业运行失败,则从失败点重试该运行。否则,触发新的运行。

参数:
  • job_id (int) – dbt Cloud 作业的 ID。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

返回:

请求响应。

返回类型:

requests.models.Response

test_connection()[source]

测试 dbt Cloud 连接。

此条目是否有帮助?