dbt Cloud 操作符

这些操作符可以执行 dbt Cloud 作业,轮询当前正在执行的作业的状态,并在本地下载运行工件。

每个操作符都可以通过两种方式绑定到特定的 dbt Cloud 帐户

  • 显式地将帐户 ID(通过 account_id 参数)提供给操作符。

  • 或者,在 Airflow 连接中指定 dbt Cloud 帐户。如果未将帐户 ID 传递给操作符,则操作符将自动回退到使用此帐户。

触发 dbt Cloud 作业

使用 DbtCloudRunJobOperator 来触发 dbt Cloud 作业的运行。默认情况下,操作符将定期检查已执行作业的状态,并以成功的状态终止,检查间隔为每 check_interval 秒,或者直到作业达到 timeout 执行时间长度。此功能由 wait_for_termination 参数控制。或者,可以将 wait_for_termination 设置为 False 来执行异步等待(通常与 DbtCloudJobRunSensor 配对)。将 wait_for_termination 设置为 False 对于长时间运行的 dbt Cloud 作业来说是一种很好的方法。

deferrable 参数以及 wait_for_termination 将控制是否在 worker 上轮询作业状态或使用 Triggerer 延迟的功能。当 wait_for_termination 为 True 且 deferrable 为 False 时,我们提交作业并在 worker 上 poll 其状态。这将使 worker 插槽保持占用状态,直到作业执行完成。当 wait_for_termination 为 True 且 deferrable 为 True 时,我们提交作业并使用 Triggerer defer。这将释放 worker 插槽,从而在作业运行时节省资源利用率。

wait_for_termination 为 False 且 deferrable 为 False 时,我们只是提交作业,并且只能使用 DbtCloudJobRunSensor跟踪作业状态。

retry_from_failure 为 True 时,如果运行失败,我们将从失败点重试作业的运行。否则,我们将触发新的运行。有关重试逻辑的更多信息,请参考 API 文档

虽然 schema_overridesteps_overrideDbtCloudRunJobOperator 的显式可选参数,但也可以使用 additional_run_config 字典将自定义运行配置传递给操作符。此参数可用于初始化作业运行的其他运行时配置或覆盖,例如 threads_overridegenerate_docs_overridegit_branch 等。有关可在运行时使用的其他配置的完整列表,请参考 API 文档

以下示例演示了如何分别使用同步和异步等待运行终止来实例化 DbtCloudRunJobOperator 任务。请注意,操作符的 account_id 在示例 DAG 的 default_args 中引用。

tests/system/dbt/cloud/example_dbt_cloud.py[源代码]

trigger_job_run1 = DbtCloudRunJobOperator(
    task_id="trigger_job_run1",
    job_id=48617,
    check_interval=10,
    timeout=300,
)

下一个示例还展示了如何通过 additional_run_config 字典传递自定义运行时配置(在本例中为 threads_override)。

tests/system/dbt/cloud/example_dbt_cloud.py[源代码]

trigger_job_run2 = DbtCloudRunJobOperator(
    task_id="trigger_job_run2",
    job_id=48617,
    wait_for_termination=False,
    additional_run_config={"threads_override": 8},
)

轮询 dbt Cloud 作业运行的状态

使用 DbtCloudJobRunSensor 定期检索 dbt Cloud 作业运行的状态,并检查运行是否成功。此传感器提供与 BaseSensorOperator 相同的所有功能。

在下面的示例中,下面的示例中的 run_id 值来自先前 DbtCloudRunJobOperator 任务的输出,方法是利用为所有操作符公开的 .output 属性。此外,请注意,任务的 account_id 在示例 DAG 的 default_args 中引用。

tests/system/dbt/cloud/example_dbt_cloud.py[源代码]

job_run_sensor = DbtCloudJobRunSensor(
    task_id="job_run_sensor", run_id=trigger_job_run2.output, timeout=20
)

此外,您可以使用 deferrable 模式异步轮询作业运行的状态。在此模式下,传感器运行时会释放 worker 插槽。

tests/system/dbt/cloud/example_dbt_cloud.py[源代码]

job_run_sensor_deferred = DbtCloudJobRunSensor(
    task_id="job_run_sensor_deferred", run_id=trigger_job_run2.output, timeout=20, deferrable=True
)

下载运行工件

使用 DbtCloudGetJobRunArtifactOperator 下载 dbt Cloud 作业运行的 dbt 生成的工件。指定的 path 值应以 target/ 目录为根。典型工件包括 manifest.jsoncatalog.jsonrun_results.json,但也可以下载其他工件,例如模型的原始 SQL 或 sources.json

有关 dbt Cloud 工件的更多信息,请参考 此文档

tests/system/dbt/cloud/example_dbt_cloud.py[源代码]

get_run_results_artifact = DbtCloudGetJobRunArtifactOperator(
    task_id="get_run_results_artifact", run_id=trigger_job_run1.output, path="run_results.json"
)

列出作业

使用 DbtCloudListJobsOperator 列出与指定的 dbt Cloud 帐户关联的所有作业。account_id 必须通过连接提供或作为参数提供给任务。

如果提供了 project_id,则只会检索与此项目 ID 相关的作业。

有关 dbt Cloud 列出作业的更多信息,请参考 此文档

tests/system/dbt/cloud/example_dbt_cloud.py[源代码]

list_dbt_jobs = DbtCloudListJobsOperator(task_id="list_dbt_jobs", account_id=106277, project_id=160645)

此条目是否有帮助?