dbt Cloud 操作器

这些操作器可以执行 dbt Cloud 作业,轮询当前正在执行的作业的状态,并在本地下载运行工件。

每个操作器可以通过两种方式绑定到特定的 dbt Cloud 帐户

  • 将帐户 ID(通过 account_id 参数)显式提供给操作器。

  • 或者,在 Airflow 连接中指定 dbt Cloud 帐户。如果没有将帐户 ID 传递给操作器,操作器将自动回退使用此帐户。

触发 dbt Cloud 作业

使用 DbtCloudRunJobOperator 触发 dbt Cloud 作业的运行。默认情况下,操作器将定期检查已执行作业的状态,每隔 check_interval 秒或直到作业达到 timeout 执行时间长度后以成功状态终止。此功能由 wait_for_termination 参数控制。或者,可以将 wait_for_termination 设置为 False 以执行异步等待(通常与 DbtCloudJobRunSensor 配对使用)。对于长时间运行的 dbt Cloud 作业,将 wait_for_termination 设置为 False 是一种很好的方法。

deferrable 参数与 wait_for_termination 将控制是在工作器上轮询作业状态还是使用触发器延迟的功能。当 wait_for_termination 为 True 且 deferrable 为 False 时,我们提交作业并在工作器上轮询其状态。这将使工作器插槽保持占用状态,直到作业执行完成。当 wait_for_termination 为 True 且 deferrable 为 True 时,我们提交作业并使用触发器延迟。这将释放工作器插槽,从而在作业运行时节省资源利用率。

wait_for_termination 为 False 且 deferrable 为 False 时,我们只提交作业,并且只能使用 DbtCloudJobRunSensor 跟踪作业状态。

retry_from_failure 为 True 时,如果运行失败,我们将从失败点开始重试作业的运行。否则,我们将触发新的运行。有关重试逻辑的更多信息,请参阅API 文档

虽然 schema_overridesteps_overrideDbtCloudRunJobOperator 的显式可选参数,但也可以使用 additional_run_config 字典将自定义运行配置传递给操作器。此参数可用于初始化作业运行的其他运行时配置或覆盖,例如 threads_overridegenerate_docs_overridegit_branch 等。有关可在运行时使用的其他配置的完整列表,请参阅API 文档

以下示例演示了如何分别使用同步和异步等待运行终止来实例化 DbtCloudRunJobOperator 任务。需要注意的是,操作器的 account_id 在示例 DAG 的 default_args 中引用。

tests/system/providers/dbt/cloud/example_dbt_cloud.py[源代码]

trigger_job_run1 = DbtCloudRunJobOperator(
    task_id="trigger_job_run1",
    job_id=48617,
    check_interval=10,
    timeout=300,
)

下一个示例还展示了如何通过 additional_run_config 字典传入自定义运行时配置(在本例中为 threads_override)。

tests/system/providers/dbt/cloud/example_dbt_cloud.py[源代码]

trigger_job_run2 = DbtCloudRunJobOperator(
    task_id="trigger_job_run2",
    job_id=48617,
    wait_for_termination=False,
    additional_run_config={"threads_override": 8},
)

轮询 dbt Cloud 作业运行的状态

使用 DbtCloudJobRunSensor 定期检索 dbt Cloud 作业运行的状态,并检查运行是否已成功。此传感器提供了 BaseSensorOperator 可用的所有相同功能。

在下面的示例中,下面示例中的 run_id 值来自先前 DbtCloudRunJobOperator 任务的输出,方法是利用为所有操作器公开的 .output 属性。此外,需要注意的是,任务的 account_id 在示例 DAG 的 default_args 中引用。

tests/system/providers/dbt/cloud/example_dbt_cloud.py[源代码]

job_run_sensor = DbtCloudJobRunSensor(
    task_id="job_run_sensor", run_id=trigger_job_run2.output, timeout=20
)

此外,您还可以使用 deferrable 模式异步轮询作业运行的状态。在此模式下,工作器插槽在传感器运行时被释放。

tests/system/providers/dbt/cloud/example_dbt_cloud.py[源代码]

job_run_sensor_deferred = DbtCloudJobRunSensor(
    task_id="job_run_sensor_deferred", run_id=trigger_job_run2.output, timeout=20, deferrable=True
)

下载运行工件

使用 DbtCloudGetJobRunArtifactOperator 下载 dbt Cloud 作业运行的 dbt 生成的工件。指定的 path 值应以 target/ 目录为根目录。典型的工件包括 manifest.jsoncatalog.jsonrun_results.json,但也可以下载其他工件,例如模型的原始 SQL 或 sources.json

有关 dbt Cloud 工件的更多信息,请参阅此文档

tests/system/providers/dbt/cloud/example_dbt_cloud.py[源代码]

get_run_results_artifact = DbtCloudGetJobRunArtifactOperator(
    task_id="get_run_results_artifact", run_id=trigger_job_run1.output, path="run_results.json"
)

列出作业

使用 DbtCloudListJobsOperator 列出与指定 dbt Cloud 帐户关联的所有作业。account_id 必须通过连接提供或作为参数提供给任务。

如果提供了 project_id,则只会检索与此项目 ID 相关的作业。

有关 dbt Cloud 列出作业的更多信息,请参阅此文档

tests/system/providers/dbt/cloud/example_dbt_cloud.py[源代码]

list_dbt_jobs = DbtCloudListJobsOperator(task_id="list_dbt_jobs", account_id=106277, project_id=160645)

此条目有帮助吗?