回调¶
日志记录和监控的一个有价值的组件是使用任务回调来对给定任务或给定 DAG 中所有任务的状态变化采取行动。例如,您可能希望在某些任务失败时发出警报,或者让 DAG 中的最后一个任务在成功时调用回调。
警告
回调函数在任务完成后执行。回调函数中的错误将显示在调度器日志中,而不是任务日志中。默认情况下,调度器日志不会显示在 UI 中,而是在 $AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log
中找到。
回调类型¶
有五种类型的任务事件可以触发回调
名称 |
描述 |
---|---|
|
当任务成功时调用 |
|
当任务失败时调用 |
|
当任务错过其定义的SLA时调用 |
|
当任务准备重试时调用 |
|
在任务开始执行之前立即调用。 |
|
当任务正在运行并且引发 AirflowSkipException 时调用。如果由于 DAG 中的先前分支决策或触发规则导致跳过执行而导致任务从未被调度执行,则明确不会调用。 |
示例¶
在以下示例中,任何任务中的失败都会调用 task_failure_alert
函数,而最后一个任务中的成功会调用 dag_success_alert
函数
import datetime
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
def task_failure_alert(context):
print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")
def dag_success_alert(context):
print(f"DAG has succeeded, run_id: {context['run_id']}")
with DAG(
dag_id="example_callback",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
dagrun_timeout=datetime.timedelta(minutes=60),
catchup=False,
on_success_callback=None,
on_failure_callback=task_failure_alert,
tags=["example"],
):
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
task1 >> task2 >> task3
注意
从 Airflow 2.6.0 开始,回调现在支持回调函数列表,允许用户指定要在所需事件中执行的多个函数。只需在定义 DAG/任务回调时将回调函数列表传递给回调参数:例如, on_failure_callback=[callback_func_1, callback_func_2]