Airflow Summit 2025 将于 10 月 07-09 日举行。立即注册获取早鸟票!

监听器

你可以编写监听器,以便 Airflow 在事件发生时通知你。这些监听器由 Pluggy 提供支持。

警告

监听器是 Airflow 的高级特性。它们与运行它们的 Airflow 组件并非隔离,可能会降低 Airflow 实例的性能,甚至在某些情况下导致实例崩溃。因此,编写监听器时应格外小心。

Airflow 支持以下事件的通知

生命周期事件

  • on_starting

  • before_stopping

生命周期事件允许你对 Airflow Job(例如 SchedulerJob)的启动和停止事件作出反应。

DagRun 状态变化事件

DagRun 改变状态时,会发生 DagRun 状态变化事件。从 Airflow 3 开始,当状态变化通过 API 触发时(对于 on_dag_run_successon_dag_run_failed),监听器也会收到通知,例如当在 Airflow UI 中将 DagRun 标记为成功时。

  • on_dag_run_running

src/airflow/example_dags/plugins/event_listener.py

@hookimpl
def on_dag_run_running(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to RUNNING.
    """
    print("Dag run  in running state")
    queued_at = dag_run.queued_at

    version = dag_run.version_number

    print(f"Dag information Queued at: {queued_at} version: {version}")


  • on_dag_run_success

src/airflow/example_dags/plugins/event_listener.py

@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to SUCCESS.
    """
    print("Dag run in success state")
    start_date = dag_run.start_date
    end_date = dag_run.end_date

    print(f"Dag run start:{start_date} end:{end_date}")


  • on_dag_run_failed

src/airflow/example_dags/plugins/event_listener.py

@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to FAILED.
    """
    print("Dag run  in failure state")
    dag_id = dag_run.dag_id
    run_id = dag_run.run_id
    run_type = dag_run.run_type

    print(f"Dag information:{dag_id} Run id: {run_id} Run type: {run_type}")
    print(f"Failed with message: {msg}")


TaskInstance 状态变化事件

RuntimeTaskInstance 改变状态时,会发生 TaskInstance 状态变化事件。你可以使用这些事件对 LocalTaskJob 状态变化作出反应。从 Airflow 3 开始,当状态变化通过 API 触发时(对于 on_task_instance_successon_task_instance_failed),监听器也会收到通知,例如当在 Airflow UI 中将任务实例标记为成功时。在这种情况下,监听器将收到一个 TaskInstance 实例,而不是 RuntimeTaskInstance 实例。

  • on_task_instance_running

src/airflow/example_dags/plugins/event_listener.py

@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: RuntimeTaskInstance):
    """
    Called when task state changes to RUNNING.

    previous_task_state and task_instance object can be used to retrieve more information about current
    task_instance that is running, its dag_run, task and dag information.
    """
    print("Task instance is in running state")
    print(" Previous state of the Task instance:", previous_state)

    name: str = task_instance.task_id

    context = task_instance.get_template_context()

    task = context["task"]

    if TYPE_CHECKING:
        assert task

    dag = task.dag
    dag_name = None
    if dag:
        dag_name = dag.dag_id
    print(f"Current task name:{name}")
    print(f"Dag name:{dag_name}")


  • on_task_instance_success

src/airflow/example_dags/plugins/event_listener.py

@hookimpl
def on_task_instance_success(
    previous_state: TaskInstanceState, task_instance: RuntimeTaskInstance | TaskInstance
):
    """
    Called when task state changes to SUCCESS.

    previous_task_state and task_instance object can be used to retrieve more information about current
    task_instance that has succeeded, its dag_run, task and dag information.

    A RuntimeTaskInstance is provided in most cases, except when the task's state change is triggered
    through the API. In that case, the TaskInstance available on the API server will be provided instead.
    """
    print("Task instance in success state")
    print(" Previous state of the Task instance:", previous_state)

    if isinstance(task_instance, TaskInstance):
        print("Task instance's state was changed through the API.")

        print(f"Task operator:{task_instance.operator}")
        return

    context = task_instance.get_template_context()
    operator = context["task"]

    print(f"Task operator:{operator}")


  • on_task_instance_failed

src/airflow/example_dags/plugins/event_listener.py

@hookimpl
def on_task_instance_failed(
    previous_state: TaskInstanceState,
    task_instance: RuntimeTaskInstance | TaskInstance,
    error: None | str | BaseException,
):
    """
    Called when task state changes to FAILED.

    previous_task_state, task_instance object and error can be used to retrieve more information about current
    task_instance that has failed, its dag_run, task and dag information.

    A RuntimeTaskInstance is provided in most cases, except when the task's state change is triggered
    through the API. In that case, the TaskInstance available on the API server will be provided instead.
    """
    print("Task instance in failure state")

    if isinstance(task_instance, TaskInstance):
        print("Task instance's state was changed through the API.")

        print(f"Task operator:{task_instance.operator}")
        if error:
            print(f"Failure caused by {error}")
        return

    context = task_instance.get_template_context()
    task = context["task"]

    if TYPE_CHECKING:
        assert task

    print("Task start")
    print(f"Task:{task}")
    if error:
        print(f"Failure caused by {error}")


资产事件

  • on_asset_created

  • on_asset_alias_created

  • on_asset_changed

当资产管理操作运行时,会发生资产事件。

Dag 导入错误事件

  • on_new_dag_import_error

  • on_existing_dag_import_error

当 dag 处理器在 Dag 代码中发现导入错误并更新元数据数据库表时,会发生 Dag 导入错误事件。

这是一项实验性功能

使用方法

创建监听器

  • import airflow.listeners.hookimpl

  • 实现你想要生成通知的事件的 hookimpls

Airflow 将规范定义为 hookspec。你的实现必须接受与 hookspec 中定义的同名参数。如果你使用的参数与 hookspec 不同,Pluggy 会在你尝试使用插件时抛出错误。但你不需要实现每个方法。许多监听器只实现一个或一部分方法。

要在你的 Airflow 安装中包含监听器,将其作为 Airflow 插件的一部分。

监听器 API 旨在跨所有 dags 和所有操作器调用。你无法监听由特定 dags 生成的事件。对于这种行为,可以尝试使用 on_success_callbackpre_execute 等方法。这些方法为特定的 DAG 作者或操作器创建者提供回调。日志和 print() 调用将作为监听器的一部分进行处理。

兼容性说明

监听器接口可能会随时间变化。我们使用 pluggy 规范,这意味着为旧版接口编写的监听器实现应该与 Airflow 的未来版本向前兼容。

然而,反之则不保证,因此如果你的监听器是针对较新版本的接口实现的,它可能无法与旧版本的 Airflow 一起工作。如果你的目标是单个 Airflow 版本,这不成问题,因为你可以根据你使用的 Airflow 版本调整实现,但如果你正在编写可与不同版本 Airflow 一起使用的插件或扩展,这一点就很重要。

例如,如果在接口中添加了新字段(如 2.10.0 版本 on_task_instance_failed 方法中的 error 字段),监听器实现将无法处理事件对象中不存在该字段的情况,此类监听器将仅适用于 Airflow 2.10.0 及更高版本。

为了实现一个与多个 Airflow 版本兼容的监听器,包括使用在较新版本的 Airflow 中添加的功能和字段,你应该检查使用的 Airflow 版本,并使用较新版本的接口实现,但对于旧版本的 Airflow,你应该使用旧版本的接口。

例如,如果你想实现一个使用 on_task_instance_failed 中的 error 字段的监听器,你应该使用如下代码

from importlib.metadata import version
from packaging.version import Version
from airflow.listeners import hookimpl

airflow_version = Version(version("apache-airflow"))
if airflow_version >= Version("2.10.0"):

    class ClassBasedListener:
        ...

        @hookimpl
        def on_task_instance_failed(self, previous_state, task_instance, error: None | str | BaseException):
            # Handle error case here
            pass

else:

    class ClassBasedListener:  # type: ignore[no-redef]
        ...

        @hookimpl
        def on_task_instance_failed(self, previous_state, task_instance):
            # Handle no error case here
            pass

自 2.8.0 版本引入监听器接口以来的变更列表

Airflow 版本

受影响方法

变更

2.10.0

on_task_instance_failed

接口中添加了 error 字段

3.0.0

on_task_instance_running

从任务实例监听器中移除 session 参数,task_instance 对象现在是 RuntimeTaskInstance 的实例

3.0.0

on_task_instance_failed, on_task_instance_success

从任务实例监听器中移除 session 参数,task_instance 对象在 worker 上是 RuntimeTaskInstance 的实例,在 API server 上是 TaskInstance 的实例

此条目有帮助吗?