设置和拆除

在数据工作流中,通常会创建资源(例如计算资源),使用它完成一些工作,然后将其拆除。Airflow 提供设置和拆除任务来支持此需求。

设置和拆除任务的关键特性

  • 如果清除任务,则其设置和拆除也将被清除。

  • 默认情况下,在评估 DAG 运行状态时会忽略拆除任务。

  • 即使工作任务失败,如果设置成功,拆除任务也会运行。

  • 在针对任务组设置依赖项时,会忽略拆除任务。

设置和拆除的工作原理

基本用法

假设您有一个 DAG,它创建集群、运行查询并删除集群。如果不使用设置和拆除任务,则可以设置以下关系

create_cluster >> run_query >> delete_cluster

要启用 create_cluster 和 delete_cluster 作为设置和拆除任务,我们将它们标记为方法 as_setupas_teardown,并在它们之间添加上游/下游关系

create_cluster.as_setup() >> run_query >> delete_cluster.as_teardown()
create_cluster >> delete_cluster

为了方便起见,我们可以通过将 create_cluster 传递给 as_teardown 方法,在一行中完成此操作

create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster)

以下是此 DAG 的图表

../_images/setup-teardown-simple.png

观察结果

  • 如果清除 run_query 以再次运行它,则 create_clusterdelete_cluster 都将被清除。

  • 如果 run_query 失败,则 delete_cluster 仍将运行。

  • DAG 运行的成功将*仅*取决于 run_query 的成功。

此外,如果我们有多个任务要包装,我们可以使用拆除作为上下文管理器

with delete_cluster().as_teardown(setups=create_cluster()):
    [RunQueryOne(), RunQueryTwo()] >> DoSomeOtherStuff()
    WorkOne() >> [do_this_stuff(), do_other_stuff()]

这将设置 create_cluster 在上下文中的任务之前运行,并在之后运行 delete_cluster。

这是图表中显示的内容

../_images/setup-teardown-complex.png

请注意,如果您尝试将已实例化的任务添加到设置上下文,则需要显式执行此操作

with my_teardown_task as scope:
    scope.add_task(work_task)  # work_task was already instantiated elsewhere

设置“范围”

设置与其拆除之间的任务位于设置/拆除对的“范围内”。

让我们看一个例子

s1 >> w1 >> w2 >> t1.as_teardown(setups=s1) >> w3
w2 >> w4

和图表

../_images/setup-teardown-scope.png

在上面的示例中,w1w2 位于 s1t1“之间”,因此假定需要 s1。因此,如果清除 w1w2s1t1 也将被清除。但是,如果清除 w3w4,则 s1t1 都不会被清除。

您可以将多个设置任务连接到一个拆除。如果至少有一个设置成功完成,则拆除将运行。

您可以进行没有拆除的设置

create_cluster >> run_query >> other_task

在这种情况下,create_cluster 下游的所有内容都被假定需要它。因此,如果您清除 other_task,它也会清除 create_cluster。假设我们在 run_query 之后添加了 create_cluster 的拆除

create_cluster >> run_query >> other_task
run_query >> delete_cluster.as_teardown(setups=create_cluster)

现在,Airflow 可以推断 other_task 不需要 create_cluster,因此如果我们清除 other_task,create_cluster 也不会被清除。

在该示例中,我们(在我们假想的文档领域)实际上想要删除集群。但是,假设我们没有,并且我们只想说“other_task 不需要 create_cluster”,那么我们可以使用 EmptyOperator 来限制设置的范围

create_cluster >> run_query >> other_task
run_query >> EmptyOperator(task_id="cluster_teardown").as_teardown(setups=create_cluster)

隐式 ALL_SUCCESS 约束

设置范围内的任何任务对其设置都有一个隐式的“all_success”约束。这是必要的,以确保如果清除具有间接设置的任务,它将等待它们完成。如果设置失败或被跳过,则依赖于它们的作业任务将被标记为询问失败或跳过。我们还要求设置的任何非拆除直接下游必须具有触发规则 ALL_SUCCESS。

控制 DAG 运行状态

设置/拆除任务的另一个特性是您可以选择拆除任务是否应该对 DAG 运行状态产生影响。也许您不关心拆除任务执行的“清理”工作是否失败,并且您只在“工作”任务失败时才将 DAG 运行视为失败。默认情况下,在确定 DAG 运行状态时不考虑拆除任务。

继续上面的示例,如果您希望运行的成功取决于 delete_cluster,则在将 delete_cluster 设置为拆除时设置 on_failure_fail_dagrun=True。例如

create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster, on_failure_fail_dagrun=True)

使用任务组进行创作

从任务组指向任务组,或从任务组指向*任务*时,我们会忽略拆除。这允许拆除并行运行,并允许 DAG 执行继续进行,即使拆除任务失败。

考虑这个例子

with TaskGroup("my_group") as tg:
    s1 = s1()
    w1 = w1()
    t1 = t1()
    s1 >> w1 >> t1.as_teardown(setups=s1)
w2 = w2()
tg >> w2

图表

../_images/setup-teardown-group.png

如果 t1 不是拆除任务,则此 DAG 实际上将是 s1 >> w1 >> t1 >> w2。但是,由于我们将 t1 标记为拆除,因此在 tg >> w2 中会忽略它。所以 DAG 等价于以下内容

s1 >> w1 >> [t1.as_teardown(setups=s1), w2]

现在让我们考虑一个嵌套的例子

with TaskGroup("my_group") as tg:
    s1 = s1()
    w1 = w1()
    t1 = t1()
    s1 >> w1 >> t1.as_teardown(setups=s1)
w2 = w2()
tg >> w2
dag_s1 = dag_s1()
dag_t1 = dag_t1()
dag_s1 >> [tg, w2] >> dag_t1.as_teardown(setups=dag_s1)

图表

../_images/setup-teardown-nesting.png

在这个例子中,s1dag_s1 的下游,所以它必须等待 dag_s1 成功完成。但是 t1dag_t1 可以并发运行,因为在表达式 tg >> dag_t1 中忽略了 t1。如果清除 w2,它将清除 dag_s1dag_t1,但不会清除任务组中的任何内容。

并行运行设置和拆除

您可以并行运行设置任务

(
    [create_cluster, create_bucket]
    >> run_query
    >> [delete_cluster.as_teardown(setups=create_cluster), delete_bucket.as_teardown(setups=create_bucket)]
)

图表

../_images/setup-teardown-parallel.png

将它们放在一个组中在视觉上会很好

with TaskGroup("setup") as tg_s:
    create_cluster = create_cluster()
    create_bucket = create_bucket()
run_query = run_query()
with TaskGroup("teardown") as tg_t:
    delete_cluster = delete_cluster().as_teardown(setups=create_cluster)
    delete_bucket = delete_bucket().as_teardown(setups=create_bucket)
tg_s >> run_query >> tg_t

和图表

../_images/setup-teardown-setup-group.png

拆除的触发规则行为

拆除使用称为 ALL_DONE_SETUP_SUCCESS 的(不可配置)触发规则。使用此规则,只要所有上游都已完成并且至少有一个直接连接的设置成功,拆除就会运行。如果拆除的所有设置都被跳过或失败,则这些状态将传播到拆除。

此条目有帮助吗?