airflow.providers.dbt.cloud.operators.dbt

DbtCloudRunJobOperatorLink

允许用户直接在 dbt Cloud 中监控触发的作业运行。

DbtCloudRunJobOperator

执行一个 dbt Cloud 作业。

DbtCloudGetJobRunArtifactOperator

从 dbt Cloud 作业运行下载工件。

DbtCloudListJobsOperator

列出 dbt Cloud 项目中的作业。

模块内容

基类: airflow.sdk.BaseOperatorLink

允许用户直接在 dbt Cloud 中监控触发的作业运行。

name = '监控作业运行'[source]

链接的名称。这将是任务界面上的按钮名称。

指向外部系统的链接。

参数:
  • operator (airflow.models.BaseOperator) – 此链接关联的 Airflow Operator 对象。

  • ti_key – 要返回链接的任务实例 ID。

返回:

指向外部系统的链接

class airflow.providers.dbt.cloud.operators.dbt.DbtCloudRunJobOperator(*, dbt_cloud_conn_id=DbtCloudHook.default_conn_name, job_id=None, project_name=None, environment_name=None, job_name=None, account_id=None, trigger_reason=None, steps_override=None, schema_override=None, wait_for_termination=True, timeout=60 * 60 * 24 * 7, check_interval=60, additional_run_config=None, reuse_existing_run=False, retry_from_failure=False, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: airflow.models.BaseOperator

执行一个 dbt Cloud 作业。

另请参阅

有关如何使用此 Operator 的更多信息,请参阅指南: 触发一个 dbt Cloud 作业

参数:
  • dbt_cloud_conn_id (str) – 用于连接 dbt Cloud 的连接 ID。

  • job_id (int | None) – dbt Cloud 作业的 ID。如果未提供 project_name、environment_name 和 job_name,则此项必填。

  • project_name (str | None) – 可选。dbt Cloud 项目的名称。仅当 job_id 为 None 时使用。

  • environment_name (str | None) – 可选。dbt Cloud 环境的名称。仅当 job_id 为 None 时使用。

  • job_name (str | None) – 可选。dbt Cloud 作业的名称。仅当 job_id 为 None 时使用。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

  • trigger_reason (str | None) – 可选。触发作业原因的描述。默认为“通过 Apache Airflow 在 DAG 中由任务 触发。”

  • steps_override (list[str] | None) – 可选。触发作业时执行的 dbt 命令列表,而不是 dbt Cloud 中配置的命令。

  • schema_override (str | None) – 可选。覆盖此作业在配置目标中的目标模式(schema)。

  • wait_for_termination (bool) – 等待作业运行终止的标志。默认情况下启用此功能,但可以禁用此功能,以便使用 DbtCloudJobRunSensor 对长时间运行的作业执行进行异步等待。

  • timeout (int) – 对于非异步等待,等待作业运行达到终端状态的超时时间(秒)。仅当 wait_for_termination 为 True 时使用。默认为 7 天。

  • check_interval (int) – 对于非异步等待,检查作业运行状态的间隔时间(秒)。仅当 wait_for_termination 为 True 时使用。默认为 60 秒。

  • additional_run_config (dict[str, Any] | None) – 可选。触发作业时应包含在 API 请求中的任何附加参数。

  • reuse_existing_run (bool) – 标志,用于确定是否重用现有的非终端作业运行。如果设置为 true 且找到非终端作业运行,则使用最新的运行,而不会触发新的作业运行。

  • retry_from_failure (bool) – 标志,用于确定是否从失败处重试作业运行。如果设置为 true 且上次作业运行失败,则触发具有与失败运行相同配置的新作业运行。有关重试逻辑的更多信息,请参阅: https://docs.getdbt.com/dbt-cloud/api-v2#/operations/Retry%20Failed%20Job

  • deferrable (bool) – 在 deferrable 模式下运行 Operator

返回:

触发的 dbt Cloud 作业运行的 ID。

template_fields = ('dbt_cloud_conn_id', 'job_id', 'project_name', 'environment_name', 'job_name', 'account_id',...[source]
dbt_cloud_conn_id = 'dbt_cloud_default'[source]
account_id = None[source]
job_id = None[source]
project_name = None[source]
environment_name = None[source]
job_name = None[source]
trigger_reason = None[source]
steps_override = None[source]
schema_override = None[source]
wait_for_termination = True[source]
timeout = 604800[source]
check_interval = 60[source]
additional_run_config[source]
run_id: int | None = None[source]
reuse_existing_run = False[source]
retry_from_failure = False[source]
deferrable = True[source]
execute(context)[source]

创建 Operator 时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event)[source]

触发器触发时执行 - 立即返回。

on_kill()[source]

重写此方法以在任务实例被杀死时清理子进程。

在 Operator 中使用 threading、subprocess 或 multiprocessing 模块的任何地方都需要清理,否则会留下僵尸进程。

property hook[source]

返回 DBT Cloud Hook。

get_openlineage_facets_on_complete(task_instance)[source]

实现 _on_complete,因为 job_run 需要首先在 execute 方法中触发。

仅当 Operator 的 wait_for_termination 设置为 True 时,才应发送附加事件。

class airflow.providers.dbt.cloud.operators.dbt.DbtCloudGetJobRunArtifactOperator(*, dbt_cloud_conn_id=DbtCloudHook.default_conn_name, run_id, path, account_id=None, step=None, output_file_name=None, **kwargs)[source]

基类: airflow.models.BaseOperator

从 dbt Cloud 作业运行下载工件。

另请参阅

有关如何使用此 Operator 的更多信息,请参阅指南: 下载运行工件

参数:
  • dbt_cloud_conn_id (str) – 用于连接 dbt Cloud 的连接 ID。

  • run_id (int) – dbt Cloud 作业运行的 ID。

  • path (str) – 与工件文件相关的的文件路径。路径根植于 target/ 目录。使用“manifest.json”、“catalog.json”或“run_results.json”下载 dbt 为此运行生成的工件。

  • account_id (int | None) – 可选。dbt Cloud 账户的 ID。

  • step (int | None) – 可选。查询工件的运行步骤索引。运行中的第一步索引为 1。如果省略 step 参数,将返回运行中最后一步的工件。

  • output_file_name (str | None) – 可选。下载工件文件所需的 文件名。默认为 _(例如 “728368_run_results.json”)。

template_fields = ('dbt_cloud_conn_id', 'run_id', 'path', 'account_id', 'output_file_name')[source]
dbt_cloud_conn_id = 'dbt_cloud_default'[source]
run_id[source]
path[source]
account_id = None[source]
step = None[source]
output_file_name = ''[source]
execute(context)[source]

创建 Operator 时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.dbt.cloud.operators.dbt.DbtCloudListJobsOperator(*, dbt_cloud_conn_id=DbtCloudHook.default_conn_name, account_id=None, project_id=None, order_by=None, **kwargs)[source]

基类: airflow.models.BaseOperator

列出 dbt Cloud 项目中的作业。

另请参阅

有关如何使用此 Operator 的更多信息,请参阅指南: 列出作业

检索与指定 dbt Cloud 账户关联的所有作业的元数据。如果提供了 project_id,则仅检索与此项目 ID 相关的作业。

参数:
  • account_id (int | None) – 可选。如果未显式提供账户 ID,将使用 dbt Cloud 连接中的账户 ID。

  • order_by (str | None) – 可选的。用于排序结果的字段。使用“-”表示反向排序。例如,要根据运行 ID 使用反向排序,请使用 order_by=-id

  • project_id (int | None) – 可选的。dbt Cloud 项目的 ID。

template_fields = ('account_id', 'project_id')[源]
dbt_cloud_conn_id = 'dbt_cloud_default'[源]
account_id = None[源]
project_id = None[源]
order_by = None[源]
execute(context)[源]

创建 Operator 时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

此条目有帮助吗?