在数据流水线中,通常需要在执行实际“工作”之前创建基础设施资源,例如集群或现有集群中的 GPU 节点,并在工作完成后将其删除。Airflow 2.7 引入了“setup”(设置)和“teardown”(拆卸)任务,以更好地支持此类流水线。本文旨在突出关键特性,让您了解可能的操作。有关如何使用设置和拆卸任务的完整文档,请参阅设置和拆卸文档。
为什么使用设置和拆卸?
在深入示例之前,让我先从宏观层面说明设置和拆卸能带来什么。
更具表现力的依赖关系
在设置和拆卸之前,上游和下游关系只能表示一种含义:“这在那之前”。有了设置和拆卸,实际上我们可以说“这需要那”。实际含义是,如果您清除一个任务,而该任务需要一个设置任务,那么该设置任务也会被清除。如果该设置任务还有相应的拆卸任务,则拆卸任务也会再次执行。
将工作与基础设施分离
有时您关心的 DAG 部分并不是清理任务。例如,假设您有一个 DAG 用于加载数据,然后删除临时文件。只要数据加载成功,您希望 DAG 被标记为成功。默认情况下,拆卸任务就是这样工作的;即在确定 DAG 运行状态时会被忽略。
简单案例
一个简单示例包含一对设置/拆卸任务,以及一个普通的“工作”任务。

设置任务和拆卸任务分别用向上和向下的箭头表示。从中我们可以看到 .create_cluster 是一个设置任务,而 delete_cluster 是一个拆卸任务。设置与拆卸之间的关联始终使用虚线,以突出其特殊关系。
需要注意的几点
- 如果
create_cluster失败,则run_query和delete_cluster都不会运行。 - 如果
create_cluster成功而run_query失败,则delete_cluster仍会运行。 - 如果
create_cluster被跳过,run_query和delete_cluster也会被跳过 - 默认情况下,如果
run_query成功而delete_cluster失败,DAG 运行仍会被标记为成功。(此行为可被覆盖。)
使用任务组编写
当我们在任务组的下游设置某些内容时,任务组中的所有拆卸任务都会被忽略。这反映了一般假设:我们通常不希望仅因为拆卸任务失败而停止 DAG 执行。因此,让我们将上述 DAG 包裹在任务组中,观察会发生什么。

下面展示了我们在代码中如何链接这些组
with TaskGroup("do_emr") as do_emr:
create_cluster_task = create_cluster()
run_query(create_cluster_task) >> delete_cluster(create_cluster_task)
with TaskGroup("load") as load:
create_config_task = create_configuration()
load_data(create_config_task) >> delete_configuration(create_config_task)
do_emr >> load
在这段代码中,每个组都有一个拆卸任务,我们仅将第一个组的箭头指向第二个组。正如预期的那样,delete_cluster(拆卸任务)被忽略。这带来两个重要后果:其一,即使它失败,load 组仍会运行;其二,delete_cluster 与 create_configuration 可以并行执行(一般来说,我们认为您不想在继续 DAG 中的其他任务之前等待拆卸操作完成)。当然,您可以通过在 delete_cluster 与 create_configuration 之间添加一条箭头来覆盖此行为。此外,该 DAG 的成功仅取决于 load_data 任务是否成功完成。
结论
我们这里省略了很多关于如何编写包含设置和拆卸任务的 DAG 的细节,详细内容请参阅设置和拆卸文档。但希望这篇文章能让您对设置和拆卸任务的可能性有足够了解,从而开始思考它们如何改进 Airflow 中的数据流水线。
想了解 Airflow 2.7 还有哪些新特性吗?请前往主页面Airflow 2.7 博客文章了解详情!
致谢
Setup and Teardown 源自 AIP-52。感谢所有为其做出贡献的人员,包括阅读和投票的同仁。特别感谢 Ash Berlin‑Taylor、Brent Bovenzi、Daniel Standish、Ephraim Anierobi、Jed Cunningham、Rahul Vats 和 Vikram Koka。
分享