在数据流水线中,通常需要在执行实际“工作”之前创建基础设施资源,例如集群或现有集群中的 GPU 节点,并在工作完成后将其删除。Airflow 2.7 引入了“setup”(设置)和“teardown”(拆卸)任务,以更好地支持此类流水线。本文旨在突出关键特性,让您了解可能的操作。有关如何使用设置和拆卸任务的完整文档,请参阅设置和拆卸文档

为什么使用设置和拆卸?

在深入示例之前,让我先从宏观层面说明设置和拆卸能带来什么。

更具表现力的依赖关系

在设置和拆卸之前,上游和下游关系只能表示一种含义:“这在那之前”。有了设置和拆卸,实际上我们可以说“这需要那”。实际含义是,如果您清除一个任务,而该任务需要一个设置任务,那么该设置任务也会被清除。如果该设置任务还有相应的拆卸任务,则拆卸任务也会再次执行。

将工作与基础设施分离

有时您关心的 DAG 部分并不是清理任务。例如,假设您有一个 DAG 用于加载数据,然后删除临时文件。只要数据加载成功,您希望 DAG 被标记为成功。默认情况下,拆卸任务就是这样工作的;即在确定 DAG 运行状态时会被忽略。

简单案例

一个简单示例包含一对设置/拆卸任务,以及一个普通的“工作”任务。

Simple setup and teardown example

设置任务和拆卸任务分别用向上和向下的箭头表示。从中我们可以看到 .create_cluster 是一个设置任务,而 delete_cluster 是一个拆卸任务。设置与拆卸之间的关联始终使用虚线,以突出其特殊关系。

需要注意的几点

  • 如果 create_cluster 失败,则 run_querydelete_cluster 都不会运行。
  • 如果 create_cluster 成功而 run_query 失败,则 delete_cluster 仍会运行。
  • 如果 create_cluster 被跳过,run_querydelete_cluster 也会被跳过
  • 默认情况下,如果 run_query 成功而 delete_cluster 失败,DAG 运行仍会被标记为成功。(此行为可被覆盖。)

使用任务组编写

当我们在任务组的下游设置某些内容时,任务组中的所有拆卸任务都会被忽略。这反映了一般假设:我们通常不希望仅因为拆卸任务失败而停止 DAG 执行。因此,让我们将上述 DAG 包裹在任务组中,观察会发生什么。

Setup and teardown in task groups

下面展示了我们在代码中如何链接这些组

with TaskGroup("do_emr") as do_emr:
    create_cluster_task = create_cluster()
    run_query(create_cluster_task) >> delete_cluster(create_cluster_task)

with TaskGroup("load") as load:
    create_config_task = create_configuration()
    load_data(create_config_task) >> delete_configuration(create_config_task)

do_emr >> load

在这段代码中,每个组都有一个拆卸任务,我们仅将第一个组的箭头指向第二个组。正如预期的那样,delete_cluster(拆卸任务)被忽略。这带来两个重要后果:其一,即使它失败,load 组仍会运行;其二,delete_clustercreate_configuration 可以并行执行(一般来说,我们认为您不想在继续 DAG 中的其他任务之前等待拆卸操作完成)。当然,您可以通过在 delete_clustercreate_configuration 之间添加一条箭头来覆盖此行为。此外,该 DAG 的成功仅取决于 load_data 任务是否成功完成。

结论

我们这里省略了很多关于如何编写包含设置和拆卸任务的 DAG 的细节,详细内容请参阅设置和拆卸文档。但希望这篇文章能让您对设置和拆卸任务的可能性有足够了解,从而开始思考它们如何改进 Airflow 中的数据流水线。

想了解 Airflow 2.7 还有哪些新特性吗?请前往主页面Airflow 2.7 博客文章了解详情!

致谢

Setup and Teardown 源自 AIP-52。感谢所有为其做出贡献的人员,包括阅读和投票的同仁。特别感谢 Ash Berlin‑Taylor、Brent Bovenzi、Daniel Standish、Ephraim Anierobi、Jed Cunningham、Rahul Vats 和 Vikram Koka。

分享