回调

日志记录和监控的一个有价值的组件是使用任务回调来对给定任务或给定 DAG 中所有任务的状态变化采取行动。例如,您可能希望在某些任务失败时发出警报,或者让 DAG 中的最后一个任务在成功时调用回调。

注意

仅当任务状态由于工作器执行而发生更改时,才会调用回调函数。因此,命令行界面(CLI)或用户界面(UI)设置的任务更改不会执行回调函数。

警告

回调函数在任务完成后执行。回调函数中的错误将显示在调度器日志中,而不是任务日志中。默认情况下,调度器日志不会显示在 UI 中,而是在 $AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log 中找到。

回调类型

有五种类型的任务事件可以触发回调

名称

描述

on_success_callback

当任务成功时调用

on_failure_callback

当任务失败时调用

sla_miss_callback

当任务错过其定义的SLA时调用

on_retry_callback

当任务准备重试时调用

on_execute_callback

在任务开始执行之前立即调用。

on_skipped_callback

当任务正在运行并且引发 AirflowSkipException 时调用。如果由于 DAG 中的先前分支决策或触发规则导致跳过执行而导致任务从未被调度执行,则明确不会调用。

示例

在以下示例中,任何任务中的失败都会调用 task_failure_alert 函数,而最后一个任务中的成功会调用 dag_success_alert 函数

import datetime
import pendulum

from airflow import DAG
from airflow.operators.empty import EmptyOperator


def task_failure_alert(context):
    print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")


def dag_success_alert(context):
    print(f"DAG has succeeded, run_id: {context['run_id']}")


with DAG(
    dag_id="example_callback",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    dagrun_timeout=datetime.timedelta(minutes=60),
    catchup=False,
    on_success_callback=None,
    on_failure_callback=task_failure_alert,
    tags=["example"],
):
    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
    task1 >> task2 >> task3

注意

从 Airflow 2.6.0 开始,回调现在支持回调函数列表,允许用户指定要在所需事件中执行的多个函数。只需在定义 DAG/任务回调时将回调函数列表传递给回调参数:例如, on_failure_callback=[callback_func_1, callback_func_2]

docscode 中, context 中可用的变量的完整列表。

此条目是否有帮助?