回调
日志和监控的一个重要组成部分是使用任务回调来对给定 Dag 或任务的状态变化,或对该 Dag 中所有任务的状态变化作出响应。例如,您可能希望在某些任务失败时发送警报,或在 Dag 成功时调用回调。
回调可以在三个不同的位置定义。
在 Dag 定义中设置的回调将应用于 Dag 级别。
使用
default_args,可以为 Dag 中的每个任务设置回调。可以在任务定义本身中设置该回调,为任务设置单独的回调。
警告
回调函数在任务完成后执行。回调函数中的错误会出现在 dag processor 日志而不是任务日志中。默认情况下,dag processor 日志不会显示在 UI 中,而是可以在 $AIRFLOW_HOME/logs/dag_processor/latest/dags-folder/<the_path_for_your_dag>/DAG_FILE.py.log 找到。
注意
从 Airflow 2.6.0 起,回调支持传入回调函数列表,允许用户为同一事件指定多个要执行的函数。只需在定义 Dag/任务回调时将回调函数列表传给相应的回调参数,例如 on_failure_callback=[callback_func_1, callback_func_2]。
回调类型
有六种事件类型可以触发回调
名称 |
描述 |
可用范围 |
|---|---|---|
|
Dag 或任务 |
|
|
Dag 或任务 |
|
|
当任务 进入重试状态 时被调用。 |
任务 |
|
在任务开始执行之前立即被调用。 |
任务 |
|
当任务 运行中 并抛出 |
任务 |
上下文映射
每个回调都会收到一个包含任务实例运行时信息的上下文映射。context 中可用的完整变量列表请参考 文档 与 代码。
Dag 回调
由于上下文映射描述了任务实例的执行,传递给 Dag 回调的上下文同样会包含任务实例变量,所选任务取决于 Dag 的状态。
在普通失败情况下,选择最近一次失败的任务。
在 Dag 运行超时的情况下,传递最近一次已启动但未完成的任务。
如果任务出现死锁,则传递本应下一步执行但未能执行的任务。
在成功时,传递最近一次成功的任务。
除用于人工分析外,不推荐在 Dag 回调中依赖任务实例变量,因为它们只反映 Dag 状态的部分信息。例如,超时可能由多个卡住的任务导致,但上下文只会选取其中一个。
注意
在 Airflow 3.2.0 之前,上述规则不适用,传递给 Dag 回调的任务实例与 Dag 状态无关,而是按字典序选择 Dag 中的最新任务。
示例
使用自定义回调方法
在下面的示例中,task1 失败时会调用 task_failure_alert 函数,而在 Dag 级别成功时会调用 dag_success_alert 函数。每个任务在开始执行前,都会调用 task_execute_callback 函数。
from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
def task_execute_callback(context):
print(f"Task has begun execution, task_instance_key_str: {context['task_instance_key_str']}")
def task_failure_alert(context):
print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")
def dag_success_alert(context):
print(f"Dag has succeeded, run_id: {context['run_id']}")
with DAG(
dag_id="example_callback",
on_success_callback=dag_success_alert,
default_args={"on_execute_callback": task_execute_callback},
):
task1 = EmptyOperator(task_id="task1", on_failure_callback=[task_failure_alert])
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
task1 >> task2 >> task3
使用通知器
您可以在 Dag 定义中通过将通知器作为 on_*_callbacks 参数传入来使用。例如,可将其与 on_success_callback 或 on_failure_callback 结合使用,以根据任务或 Dag 运行的状态发送通知。
下面是使用自定义通知器的示例
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
from myprovider.notifier import MyNotifier
with DAG(
dag_id="example_notifier",
on_success_callback=MyNotifier(message="Success!"),
on_failure_callback=MyNotifier(message="Failure!"),
):
task = BashOperator(
task_id="example_task",
bash_command="exit 1",
on_success_callback=MyNotifier(message="Task Succeeded!"),
)
截止时间警报回调
除了上面的 Dag/任务生命周期回调外,Airflow 还支持 **截止时间警报** 回调,当 Dag 运行超过配置的时间阈值时触发。截止时间警报回调使用 AsyncCallback(在 Triggerer 中运行)或 SyncCallback(在执行器中运行),并通过 Dag 的 deadline 参数进行配置。
完整细节请参见 截止时间警报。