dbt Cloud 操作器¶
这些操作器可以执行 dbt Cloud 作业,轮询当前正在执行的作业的状态,并在本地下载运行工件。
每个操作器可以通过两种方式绑定到特定的 dbt Cloud 帐户
将帐户 ID(通过
account_id
参数)显式提供给操作器。或者,在 Airflow 连接中指定 dbt Cloud 帐户。如果没有将帐户 ID 传递给操作器,操作器将自动回退使用此帐户。
触发 dbt Cloud 作业¶
使用 DbtCloudRunJobOperator
触发 dbt Cloud 作业的运行。默认情况下,操作器将定期检查已执行作业的状态,每隔 check_interval
秒或直到作业达到 timeout
执行时间长度后以成功状态终止。此功能由 wait_for_termination
参数控制。或者,可以将 wait_for_termination
设置为 False 以执行异步等待(通常与 DbtCloudJobRunSensor
配对使用)。对于长时间运行的 dbt Cloud 作业,将 wait_for_termination
设置为 False 是一种很好的方法。
deferrable
参数与 wait_for_termination
将控制是在工作器上轮询作业状态还是使用触发器延迟的功能。当 wait_for_termination
为 True 且 deferrable
为 False 时,我们提交作业并在工作器上轮询
其状态。这将使工作器插槽保持占用状态,直到作业执行完成。当 wait_for_termination
为 True 且 deferrable
为 True 时,我们提交作业并使用触发器延迟
。这将释放工作器插槽,从而在作业运行时节省资源利用率。
当 wait_for_termination
为 False 且 deferrable
为 False 时,我们只提交作业,并且只能使用 DbtCloudJobRunSensor
跟踪作业状态。
当 retry_from_failure
为 True 时,如果运行失败,我们将从失败点开始重试作业的运行。否则,我们将触发新的运行。有关重试逻辑的更多信息,请参阅API 文档。
虽然 schema_override
和 steps_override
是 DbtCloudRunJobOperator
的显式可选参数,但也可以使用 additional_run_config
字典将自定义运行配置传递给操作器。此参数可用于初始化作业运行的其他运行时配置或覆盖,例如 threads_override
、generate_docs_override
、git_branch
等。有关可在运行时使用的其他配置的完整列表,请参阅API 文档。
以下示例演示了如何分别使用同步和异步等待运行终止来实例化 DbtCloudRunJobOperator 任务。需要注意的是,操作器的 account_id
在示例 DAG 的 default_args
中引用。
trigger_job_run1 = DbtCloudRunJobOperator(
task_id="trigger_job_run1",
job_id=48617,
check_interval=10,
timeout=300,
)
下一个示例还展示了如何通过 additional_run_config
字典传入自定义运行时配置(在本例中为 threads_override
)。
trigger_job_run2 = DbtCloudRunJobOperator(
task_id="trigger_job_run2",
job_id=48617,
wait_for_termination=False,
additional_run_config={"threads_override": 8},
)
轮询 dbt Cloud 作业运行的状态¶
使用 DbtCloudJobRunSensor
定期检索 dbt Cloud 作业运行的状态,并检查运行是否已成功。此传感器提供了 BaseSensorOperator
可用的所有相同功能。
在下面的示例中,下面示例中的 run_id
值来自先前 DbtCloudRunJobOperator 任务的输出,方法是利用为所有操作器公开的 .output
属性。此外,需要注意的是,任务的 account_id
在示例 DAG 的 default_args
中引用。
job_run_sensor = DbtCloudJobRunSensor(
task_id="job_run_sensor", run_id=trigger_job_run2.output, timeout=20
)
此外,您还可以使用 deferrable
模式异步轮询作业运行的状态。在此模式下,工作器插槽在传感器运行时被释放。
job_run_sensor_deferred = DbtCloudJobRunSensor(
task_id="job_run_sensor_deferred", run_id=trigger_job_run2.output, timeout=20, deferrable=True
)
下载运行工件¶
使用 DbtCloudGetJobRunArtifactOperator
下载 dbt Cloud 作业运行的 dbt 生成的工件。指定的 path
值应以 target/
目录为根目录。典型的工件包括 manifest.json
、catalog.json
和 run_results.json
,但也可以下载其他工件,例如模型的原始 SQL 或 sources.json
。
有关 dbt Cloud 工件的更多信息,请参阅此文档。
get_run_results_artifact = DbtCloudGetJobRunArtifactOperator(
task_id="get_run_results_artifact", run_id=trigger_job_run1.output, path="run_results.json"
)
列出作业¶
使用 DbtCloudListJobsOperator
列出与指定 dbt Cloud 帐户关联的所有作业。account_id
必须通过连接提供或作为参数提供给任务。
如果提供了 project_id
,则只会检索与此项目 ID 相关的作业。
有关 dbt Cloud 列出作业的更多信息,请参阅此文档。
list_dbt_jobs = DbtCloudListJobsOperator(task_id="list_dbt_jobs", account_id=106277, project_id=160645)