回调

日志和监控的一个重要组成部分是使用任务回调来对给定 Dag 或任务的状态变化,或对该 Dag 中所有任务的状态变化作出响应。例如,您可能希望在某些任务失败时发送警报,或在 Dag 成功时调用回调。

回调可以在三个不同的位置定义。

  • 在 Dag 定义中设置的回调将应用于 Dag 级别。

  • 使用 default_args,可以为 Dag 中的每个任务设置回调。

  • 可以在任务定义本身中设置该回调,为任务设置单独的回调。

注意

回调函数仅在 Dag 或任务状态因 worker 执行而改变时被调用。因此,通过CLIUI设置的 Dag 和任务更改不会执行回调函数。

警告

回调函数在任务完成后执行。回调函数中的错误会出现在 dag processor 日志而不是任务日志中。默认情况下,dag processor 日志不会显示在 UI 中,而是可以在 $AIRFLOW_HOME/logs/dag_processor/latest/dags-folder/<the_path_for_your_dag>/DAG_FILE.py.log 找到。

注意

从 Airflow 2.6.0 起,回调支持传入回调函数列表,允许用户为同一事件指定多个要执行的函数。只需在定义 Dag/任务回调时将回调函数列表传给相应的回调参数,例如 on_failure_callback=[callback_func_1, callback_func_2]

回调类型

有六种事件类型可以触发回调

名称

描述

可用范围

on_success_callback

Dag 成功任务成功 时被调用。

Dag 或任务

on_failure_callback

Dag 失败 或任务 失败 时被调用。

Dag 或任务

on_retry_callback

当任务 进入重试状态 时被调用。

任务

on_execute_callback

在任务开始执行之前立即被调用。

任务

on_skipped_callback

当任务 运行中 并抛出 AirflowSkipException 时被调用。需要明确的是,如果任务因为 Dag 中前置分支决定或触发规则导致被跳过而根本未被调度执行,则不会调用此回调。

任务

上下文映射

每个回调都会收到一个包含任务实例运行时信息的上下文映射。context 中可用的完整变量列表请参考 文档代码

Dag 回调

由于上下文映射描述了任务实例的执行,传递给 Dag 回调的上下文同样会包含任务实例变量,所选任务取决于 Dag 的状态。

  1. 在普通失败情况下,选择最近一次失败的任务。

  2. 在 Dag 运行超时的情况下,传递最近一次已启动但未完成的任务。

  3. 如果任务出现死锁,则传递本应下一步执行但未能执行的任务。

  4. 在成功时,传递最近一次成功的任务。

除用于人工分析外,不推荐在 Dag 回调中依赖任务实例变量,因为它们只反映 Dag 状态的部分信息。例如,超时可能由多个卡住的任务导致,但上下文只会选取其中一个。

注意

在 Airflow 3.2.0 之前,上述规则不适用,传递给 Dag 回调的任务实例与 Dag 状态无关,而是按字典序选择 Dag 中的最新任务。

示例

使用自定义回调方法

在下面的示例中,task1 失败时会调用 task_failure_alert 函数,而在 Dag 级别成功时会调用 dag_success_alert 函数。每个任务在开始执行前,都会调用 task_execute_callback 函数。

from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator


def task_execute_callback(context):
    print(f"Task has begun execution, task_instance_key_str: {context['task_instance_key_str']}")


def task_failure_alert(context):
    print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")


def dag_success_alert(context):
    print(f"Dag has succeeded, run_id: {context['run_id']}")


with DAG(
    dag_id="example_callback",
    on_success_callback=dag_success_alert,
    default_args={"on_execute_callback": task_execute_callback},
):
    task1 = EmptyOperator(task_id="task1", on_failure_callback=[task_failure_alert])
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3")
    task1 >> task2 >> task3

使用通知器

您可以在 Dag 定义中通过将通知器作为 on_*_callbacks 参数传入来使用。例如,可将其与 on_success_callbackon_failure_callback 结合使用,以根据任务或 Dag 运行的状态发送通知。

下面是使用自定义通知器的示例

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator

from myprovider.notifier import MyNotifier

with DAG(
    dag_id="example_notifier",
    on_success_callback=MyNotifier(message="Success!"),
    on_failure_callback=MyNotifier(message="Failure!"),
):
    task = BashOperator(
        task_id="example_task",
        bash_command="exit 1",
        on_success_callback=MyNotifier(message="Task Succeeded!"),
    )

想查看社区维护的通知器列表,请参阅 通知。有关编写自定义通知器的更多信息,请参见 通知器 操作指南。

截止时间警报回调

除了上面的 Dag/任务生命周期回调外,Airflow 还支持 **截止时间警报** 回调,当 Dag 运行超过配置的时间阈值时触发。截止时间警报回调使用 AsyncCallback(在 Triggerer 中运行)或 SyncCallback(在执行器中运行),并通过 Dag 的 deadline 参数进行配置。

完整细节请参见 截止时间警报

此条目是否有帮助?