dbt Cloud 操作符

这些操作符可以执行 dbt Cloud 作业,轮询当前正在执行的作业状态,并在本地下载运行工件。

每个操作符都可以通过两种方式绑定到特定的 dbt Cloud 账户

  • 显式提供账户 ID(通过 account_id 参数)给操作符。

  • 或者,在 Airflow 连接中指定 dbt Cloud 账户。如果未将账户 ID 传递给操作符,则操作符将自动回退使用此配置。

触发 dbt Cloud 作业

使用 DbtCloudRunJobOperator 触发 dbt Cloud 作业运行。默认情况下,操作符将每隔 check_interval 秒定期检查执行作业的状态,直到作业达到 timeout 执行时长或以成功状态终止。此功能由 wait_for_termination 参数控制。或者,可以将 wait_for_termination 设置为 False 以执行异步等待(通常与 DbtCloudJobRunSensor 结合使用)。将 wait_for_termination 设置为 False 是处理长时间运行的 dbt Cloud 作业的好方法。

deferrable 参数与 wait_for_termination 参数一起控制是在 worker 上轮询作业状态还是使用 Triggerer 进行延迟的功能。当 wait_for_termination 为 True 且 deferrable 为 False 时,我们提交作业并在 worker 上 轮询 其状态。这将保持 worker 插槽被占用直到作业执行完成。当 wait_for_termination 为 True 且 deferrable 为 True 时,我们提交作业并使用 Triggerer 进行 延迟。这将释放 worker 插槽,从而在作业运行时节省资源利用。

wait_for_termination 为 False 且 deferrable 为 False 时,我们只提交作业,并且只能使用 DbtCloudJobRunSensor 跟踪作业状态。

retry_from_failure 为 True 时,如果运行失败,我们将从失败点重试该作业的运行。否则,我们将触发新的运行。有关重试逻辑的更多信息,请参阅 API 文档

虽然 schema_overridesteps_overrideDbtCloudRunJobOperator 的显式可选参数,但也可以使用 additional_run_config 字典将自定义运行配置传递给操作符。此参数可用于初始化作业运行的其他运行时配置或覆盖,例如 threads_overridegenerate_docs_overridegit_branch 等。有关可以在运行时使用的其他配置的完整列表,请参阅 API 文档

以下示例分别演示了如何实例化 DbtCloudRunJobOperator 任务,使用同步和异步等待运行终止。请注意,操作符的 account_id 在示例 DAG 的 default_args 中引用。

tests/system/dbt/cloud/example_dbt_cloud.py

trigger_job_run1 = DbtCloudRunJobOperator(
    task_id="trigger_job_run1",
    job_id=48617,
    check_interval=10,
    timeout=300,
)

下一个示例还展示了如何通过 additional_run_config 字典传入自定义运行时配置(在本例中用于 threads_override)。

tests/system/dbt/cloud/example_dbt_cloud.py

trigger_job_run2 = DbtCloudRunJobOperator(
    task_id="trigger_job_run2",
    job_id=48617,
    wait_for_termination=False,
    additional_run_config={"threads_override": 8},
)

您也可以在不提供 job_id 的情况下触发 dbt Cloud 作业。作为替代,您可以通过提供 project_nameenvironment_namejob_name 来标识作业。请注意,这仅在上述三个参数能够唯一标识您账户中的作业时才有效(即,您不能在同一项目和环境中拥有两个同名的作业)。

tests/system/dbt/cloud/example_dbt_cloud.py

trigger_job_run3 = DbtCloudRunJobOperator(
    task_id="trigger_job_run3",
    project_name="my_dbt_project",
    environment_name="prod",
    job_name="my_dbt_job",
    check_interval=10,
    timeout=300,
)

轮询 dbt Cloud 作业运行状态

使用 DbtCloudJobRunSensor 定期检索 dbt Cloud 作业运行的状态并检查运行是否成功。此 Sensor 提供了 BaseSensorOperator 可用的所有相同功能。

在下面的示例中,run_id 值来自先前 DbtCloudRunJobOperator 任务的输出,利用了所有操作符都公开的 .output 属性。另外请注意,该任务的 account_id 在示例 DAG 的 default_args 中引用。

tests/system/dbt/cloud/example_dbt_cloud.py

job_run_sensor = DbtCloudJobRunSensor(
    task_id="job_run_sensor", run_id=trigger_job_run2.output, timeout=20
)

此外,您还可以使用 延迟 模式异步轮询作业运行状态。在此模式下,Sensor 运行时会释放 worker 插槽。

tests/system/dbt/cloud/example_dbt_cloud.py

job_run_sensor_deferred = DbtCloudJobRunSensor(
    task_id="job_run_sensor_deferred", run_id=trigger_job_run2.output, timeout=20, deferrable=True
)

下载运行工件

使用 DbtCloudGetJobRunArtifactOperator 下载 dbt Cloud 作业运行生成的工件。指定的 path 值应以 target/ 目录为根目录。典型工件包括 manifest.jsoncatalog.jsonrun_results.json,但也可以下载其他工件,例如模型的原始 SQL 或 sources.json

有关 dbt Cloud 工件的更多信息,请参阅此文档

tests/system/dbt/cloud/example_dbt_cloud.py

get_run_results_artifact = DbtCloudGetJobRunArtifactOperator(
    task_id="get_run_results_artifact", run_id=trigger_job_run1.output, path="run_results.json"
)

列出作业

使用 DbtCloudListJobsOperator 列出与指定 dbt Cloud 账户关联的所有作业。account_id 必须通过连接提供或作为参数提供给任务。

如果提供了 project_id,将只检索与此项目 ID 相关的作业。

有关列出 dbt Cloud 作业的更多信息,请参阅此文档

tests/system/dbt/cloud/example_dbt_cloud.py

list_dbt_jobs = DbtCloudListJobsOperator(task_id="list_dbt_jobs", account_id=106277, project_id=160645)

此条目有帮助吗?