设置和拆除¶
在数据工作流中,通常会创建资源(例如计算资源),使用它完成一些工作,然后将其拆除。Airflow 提供设置和拆除任务来支持此需求。
设置和拆除任务的关键特性
如果清除任务,则其设置和拆除也将被清除。
默认情况下,在评估 DAG 运行状态时会忽略拆除任务。
即使工作任务失败,如果设置成功,拆除任务也会运行。
在针对任务组设置依赖项时,会忽略拆除任务。
设置和拆除的工作原理¶
基本用法¶
假设您有一个 DAG,它创建集群、运行查询并删除集群。如果不使用设置和拆除任务,则可以设置以下关系
create_cluster >> run_query >> delete_cluster
要启用 create_cluster 和 delete_cluster 作为设置和拆除任务,我们将它们标记为方法 as_setup
和 as_teardown
,并在它们之间添加上游/下游关系
create_cluster.as_setup() >> run_query >> delete_cluster.as_teardown()
create_cluster >> delete_cluster
为了方便起见,我们可以通过将 create_cluster
传递给 as_teardown
方法,在一行中完成此操作
create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster)
以下是此 DAG 的图表
观察结果
如果清除
run_query
以再次运行它,则create_cluster
和delete_cluster
都将被清除。如果
run_query
失败,则delete_cluster
仍将运行。DAG 运行的成功将*仅*取决于
run_query
的成功。
此外,如果我们有多个任务要包装,我们可以使用拆除作为上下文管理器
with delete_cluster().as_teardown(setups=create_cluster()):
[RunQueryOne(), RunQueryTwo()] >> DoSomeOtherStuff()
WorkOne() >> [do_this_stuff(), do_other_stuff()]
这将设置 create_cluster 在上下文中的任务之前运行,并在之后运行 delete_cluster。
这是图表中显示的内容
请注意,如果您尝试将已实例化的任务添加到设置上下文,则需要显式执行此操作
with my_teardown_task as scope:
scope.add_task(work_task) # work_task was already instantiated elsewhere
设置“范围”¶
设置与其拆除之间的任务位于设置/拆除对的“范围内”。
让我们看一个例子
s1 >> w1 >> w2 >> t1.as_teardown(setups=s1) >> w3
w2 >> w4
和图表
在上面的示例中,w1
和 w2
位于 s1
和 t1
“之间”,因此假定需要 s1
。因此,如果清除 w1
或 w2
,s1
和 t1
也将被清除。但是,如果清除 w3
或 w4
,则 s1
和 t1
都不会被清除。
您可以将多个设置任务连接到一个拆除。如果至少有一个设置成功完成,则拆除将运行。
您可以进行没有拆除的设置
create_cluster >> run_query >> other_task
在这种情况下,create_cluster 下游的所有内容都被假定需要它。因此,如果您清除 other_task,它也会清除 create_cluster。假设我们在 run_query 之后添加了 create_cluster 的拆除
create_cluster >> run_query >> other_task
run_query >> delete_cluster.as_teardown(setups=create_cluster)
现在,Airflow 可以推断 other_task 不需要 create_cluster,因此如果我们清除 other_task,create_cluster 也不会被清除。
在该示例中,我们(在我们假想的文档领域)实际上想要删除集群。但是,假设我们没有,并且我们只想说“other_task 不需要 create_cluster”,那么我们可以使用 EmptyOperator 来限制设置的范围
create_cluster >> run_query >> other_task
run_query >> EmptyOperator(task_id="cluster_teardown").as_teardown(setups=create_cluster)
隐式 ALL_SUCCESS 约束¶
设置范围内的任何任务对其设置都有一个隐式的“all_success”约束。这是必要的,以确保如果清除具有间接设置的任务,它将等待它们完成。如果设置失败或被跳过,则依赖于它们的作业任务将被标记为询问失败或跳过。我们还要求设置的任何非拆除直接下游必须具有触发规则 ALL_SUCCESS。
控制 DAG 运行状态¶
设置/拆除任务的另一个特性是您可以选择拆除任务是否应该对 DAG 运行状态产生影响。也许您不关心拆除任务执行的“清理”工作是否失败,并且您只在“工作”任务失败时才将 DAG 运行视为失败。默认情况下,在确定 DAG 运行状态时不考虑拆除任务。
继续上面的示例,如果您希望运行的成功取决于 delete_cluster
,则在将 delete_cluster
设置为拆除时设置 on_failure_fail_dagrun=True
。例如
create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster, on_failure_fail_dagrun=True)
使用任务组进行创作¶
从任务组指向任务组,或从任务组指向*任务*时,我们会忽略拆除。这允许拆除并行运行,并允许 DAG 执行继续进行,即使拆除任务失败。
考虑这个例子
with TaskGroup("my_group") as tg:
s1 = s1()
w1 = w1()
t1 = t1()
s1 >> w1 >> t1.as_teardown(setups=s1)
w2 = w2()
tg >> w2
图表
如果 t1
不是拆除任务,则此 DAG 实际上将是 s1 >> w1 >> t1 >> w2
。但是,由于我们将 t1
标记为拆除,因此在 tg >> w2
中会忽略它。所以 DAG 等价于以下内容
s1 >> w1 >> [t1.as_teardown(setups=s1), w2]
现在让我们考虑一个嵌套的例子
with TaskGroup("my_group") as tg:
s1 = s1()
w1 = w1()
t1 = t1()
s1 >> w1 >> t1.as_teardown(setups=s1)
w2 = w2()
tg >> w2
dag_s1 = dag_s1()
dag_t1 = dag_t1()
dag_s1 >> [tg, w2] >> dag_t1.as_teardown(setups=dag_s1)
图表
在这个例子中,s1
在 dag_s1
的下游,所以它必须等待 dag_s1
成功完成。但是 t1
和 dag_t1
可以并发运行,因为在表达式 tg >> dag_t1
中忽略了 t1
。如果清除 w2
,它将清除 dag_s1
和 dag_t1
,但不会清除任务组中的任何内容。
并行运行设置和拆除¶
您可以并行运行设置任务
(
[create_cluster, create_bucket]
>> run_query
>> [delete_cluster.as_teardown(setups=create_cluster), delete_bucket.as_teardown(setups=create_bucket)]
)
图表
将它们放在一个组中在视觉上会很好
with TaskGroup("setup") as tg_s:
create_cluster = create_cluster()
create_bucket = create_bucket()
run_query = run_query()
with TaskGroup("teardown") as tg_t:
delete_cluster = delete_cluster().as_teardown(setups=create_cluster)
delete_bucket = delete_bucket().as_teardown(setups=create_bucket)
tg_s >> run_query >> tg_t
和图表
拆除的触发规则行为¶
拆除使用称为 ALL_DONE_SETUP_SUCCESS 的(不可配置)触发规则。使用此规则,只要所有上游都已完成并且至少有一个直接连接的设置成功,拆除就会运行。如果拆除的所有设置都被跳过或失败,则这些状态将传播到拆除。