dbt Cloud 操作符¶
这些操作符可以执行 dbt Cloud 作业,轮询当前正在执行的作业状态,并在本地下载运行工件。
每个操作符都可以通过两种方式绑定到特定的 dbt Cloud 账户
显式提供账户 ID(通过
account_id
参数)给操作符。或者,在 Airflow 连接中指定 dbt Cloud 账户。如果未将账户 ID 传递给操作符,则操作符将自动回退使用此配置。
触发 dbt Cloud 作业¶
使用 DbtCloudRunJobOperator
触发 dbt Cloud 作业运行。默认情况下,操作符将每隔 check_interval
秒定期检查执行作业的状态,直到作业达到 timeout
执行时长或以成功状态终止。此功能由 wait_for_termination
参数控制。或者,可以将 wait_for_termination
设置为 False 以执行异步等待(通常与 DbtCloudJobRunSensor
结合使用)。将 wait_for_termination
设置为 False 是处理长时间运行的 dbt Cloud 作业的好方法。
deferrable
参数与 wait_for_termination
参数一起控制是在 worker 上轮询作业状态还是使用 Triggerer 进行延迟的功能。当 wait_for_termination
为 True 且 deferrable
为 False 时,我们提交作业并在 worker 上 轮询
其状态。这将保持 worker 插槽被占用直到作业执行完成。当 wait_for_termination
为 True 且 deferrable
为 True 时,我们提交作业并使用 Triggerer 进行 延迟
。这将释放 worker 插槽,从而在作业运行时节省资源利用。
当 wait_for_termination
为 False 且 deferrable
为 False 时,我们只提交作业,并且只能使用 DbtCloudJobRunSensor
跟踪作业状态。
当 retry_from_failure
为 True 时,如果运行失败,我们将从失败点重试该作业的运行。否则,我们将触发新的运行。有关重试逻辑的更多信息,请参阅 API 文档。
虽然 schema_override
和 steps_override
是 DbtCloudRunJobOperator
的显式可选参数,但也可以使用 additional_run_config
字典将自定义运行配置传递给操作符。此参数可用于初始化作业运行的其他运行时配置或覆盖,例如 threads_override
、generate_docs_override
、git_branch
等。有关可以在运行时使用的其他配置的完整列表,请参阅 API 文档。
以下示例分别演示了如何实例化 DbtCloudRunJobOperator 任务,使用同步和异步等待运行终止。请注意,操作符的 account_id
在示例 DAG 的 default_args
中引用。
tests/system/dbt/cloud/example_dbt_cloud.py
trigger_job_run1 = DbtCloudRunJobOperator(
task_id="trigger_job_run1",
job_id=48617,
check_interval=10,
timeout=300,
)
下一个示例还展示了如何通过 additional_run_config
字典传入自定义运行时配置(在本例中用于 threads_override
)。
tests/system/dbt/cloud/example_dbt_cloud.py
trigger_job_run2 = DbtCloudRunJobOperator(
task_id="trigger_job_run2",
job_id=48617,
wait_for_termination=False,
additional_run_config={"threads_override": 8},
)
您也可以在不提供 job_id
的情况下触发 dbt Cloud 作业。作为替代,您可以通过提供 project_name
、environment_name
和 job_name
来标识作业。请注意,这仅在上述三个参数能够唯一标识您账户中的作业时才有效(即,您不能在同一项目和环境中拥有两个同名的作业)。
tests/system/dbt/cloud/example_dbt_cloud.py
trigger_job_run3 = DbtCloudRunJobOperator(
task_id="trigger_job_run3",
project_name="my_dbt_project",
environment_name="prod",
job_name="my_dbt_job",
check_interval=10,
timeout=300,
)
轮询 dbt Cloud 作业运行状态¶
使用 DbtCloudJobRunSensor
定期检索 dbt Cloud 作业运行的状态并检查运行是否成功。此 Sensor 提供了 BaseSensorOperator
可用的所有相同功能。
在下面的示例中,run_id
值来自先前 DbtCloudRunJobOperator 任务的输出,利用了所有操作符都公开的 .output
属性。另外请注意,该任务的 account_id
在示例 DAG 的 default_args
中引用。
tests/system/dbt/cloud/example_dbt_cloud.py
job_run_sensor = DbtCloudJobRunSensor(
task_id="job_run_sensor", run_id=trigger_job_run2.output, timeout=20
)
此外,您还可以使用 延迟
模式异步轮询作业运行状态。在此模式下,Sensor 运行时会释放 worker 插槽。
tests/system/dbt/cloud/example_dbt_cloud.py
job_run_sensor_deferred = DbtCloudJobRunSensor(
task_id="job_run_sensor_deferred", run_id=trigger_job_run2.output, timeout=20, deferrable=True
)
下载运行工件¶
使用 DbtCloudGetJobRunArtifactOperator
下载 dbt Cloud 作业运行生成的工件。指定的 path
值应以 target/
目录为根目录。典型工件包括 manifest.json
、catalog.json
和 run_results.json
,但也可以下载其他工件,例如模型的原始 SQL 或 sources.json
。
有关 dbt Cloud 工件的更多信息,请参阅此文档。
tests/system/dbt/cloud/example_dbt_cloud.py
get_run_results_artifact = DbtCloudGetJobRunArtifactOperator(
task_id="get_run_results_artifact", run_id=trigger_job_run1.output, path="run_results.json"
)
列出作业¶
使用 DbtCloudListJobsOperator
列出与指定 dbt Cloud 账户关联的所有作业。account_id
必须通过连接提供或作为参数提供给任务。
如果提供了 project_id
,将只检索与此项目 ID 相关的作业。
有关列出 dbt Cloud 作业的更多信息,请参阅此文档。
tests/system/dbt/cloud/example_dbt_cloud.py
list_dbt_jobs = DbtCloudListJobsOperator(task_id="list_dbt_jobs", account_id=106277, project_id=160645)