监听器

您可以编写监听器,使 Airflow 在事件发生时通知您。Pluggy 为这些监听器提供支持。

警告

监听器是 Airflow 的高级功能。它们并非与运行它们的 Airflow 组件隔离,可能导致性能下降,甚至在某些情况下导致 Airflow 实例宕机。因此,在编写监听器时需要格外小心。

Airflow 支持以下事件的通知

生命周期事件

  • on_starting

  • before_stopping

生命周期事件允许您对 Airflow Job 的启动和停止事件作出响应,例如 SchedulerJob

DagRun 状态变更事件

DagRun 状态变更事件在 DagRun 状态改变时触发。自 Airflow 3 起,监听器同样会在通过 API 触发状态变更时收到通知(对应 on_dag_run_successon_dag_run_failed),例如在 Airflow UI 中将 DagRun 标记为成功。

  • on_dag_run_running

airflow/example_dags/plugins/event_listener.py[source]

@hookimpl
def on_dag_run_running(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to RUNNING.
    """
    print("Dag run  in running state")
    queued_at = dag_run.queued_at

    version = dag_run.version_number

    print(f"Dag information Queued at: {queued_at} version: {version}")


  • on_dag_run_success

airflow/example_dags/plugins/event_listener.py[source]

@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to SUCCESS.
    """
    print("Dag run in success state")
    start_date = dag_run.start_date
    end_date = dag_run.end_date

    print(f"Dag run start:{start_date} end:{end_date}")


  • on_dag_run_failed

airflow/example_dags/plugins/event_listener.py[source]

@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to FAILED.
    """
    print("Dag run  in failure state")
    dag_id = dag_run.dag_id
    run_id = dag_run.run_id
    run_type = dag_run.run_type

    print(f"Dag information:{dag_id} Run id: {run_id} Run type: {run_type}")
    print(f"Failed with message: {msg}")


TaskInstance 状态变更事件

TaskInstance 状态变更事件在 RuntimeTaskInstance 状态改变时触发。您可以利用这些事件响应 LocalTaskJob 的状态变化。从 Airflow 3 开始,若通过 API 触发状态变更(对应 on_task_instance_successon_task_instance_failed),监听器同样会收到通知,例如在 UI 中将任务实例标记为成功。在这种情况下,监听器收到的将是 TaskInstance 实例,而不是 RuntimeTaskInstance 实例。

  • on_task_instance_running

airflow/example_dags/plugins/event_listener.py[source]

@hookimpl
def on_task_instance_running(
    previous_state: TaskInstanceState | None, task_instance: RuntimeTaskInstance | TaskInstance
):
    """
    Called when task state changes to RUNNING.

    previous_task_state and task_instance object can be used to retrieve more information about current
    task_instance that is running, its dag_run, task and dag information.
    """
    print("Task instance is in running state")
    print(" Previous state of the Task instance:", previous_state)

    name: str = task_instance.task_id

    context = task_instance.get_template_context()

    task = context["task"]

    if TYPE_CHECKING:
        assert task

    dag = task.dag
    dag_name = None
    if dag:
        dag_name = dag.dag_id
    print(f"Current task name:{name}")
    print(f"Dag name:{dag_name}")


  • on_task_instance_success

airflow/example_dags/plugins/event_listener.py[source]

@hookimpl
def on_task_instance_success(
    previous_state: TaskInstanceState | None, task_instance: RuntimeTaskInstance | TaskInstance
):
    """
    Called when task state changes to SUCCESS.

    previous_task_state and task_instance object can be used to retrieve more information about current
    task_instance that has succeeded, its dag_run, task and dag information.

    A RuntimeTaskInstance is provided in most cases, except when the task's state change is triggered
    through the API. In that case, the TaskInstance available on the API server will be provided instead.
    """
    print("Task instance in success state")
    print(" Previous state of the Task instance:", previous_state)

    if isinstance(task_instance, TaskInstance):
        print("Task instance's state was changed through the API.")

        print(f"Task operator:{task_instance.operator}")
        return

    context = task_instance.get_template_context()
    operator = context["task"]

    print(f"Task operator:{operator}")


  • on_task_instance_failed

airflow/example_dags/plugins/event_listener.py[source]

@hookimpl
def on_task_instance_failed(
    previous_state: TaskInstanceState | None,
    task_instance: RuntimeTaskInstance | TaskInstance,
    error: None | str | BaseException,
):
    """
    Called when task state changes to FAILED.

    previous_task_state, task_instance object and error can be used to retrieve more information about current
    task_instance that has failed, its dag_run, task and dag information.

    A RuntimeTaskInstance is provided in most cases, except when the task's state change is triggered
    through the API. In that case, the TaskInstance available on the API server will be provided instead.
    """
    print("Task instance in failure state")

    if isinstance(task_instance, TaskInstance):
        print("Task instance's state was changed through the API.")

        print(f"Task operator:{task_instance.operator}")
        if error:
            print(f"Failure caused by {error}")
        return

    context = task_instance.get_template_context()
    task = context["task"]

    if TYPE_CHECKING:
        assert task

    print("Task start")
    print(f"Task:{task}")
    if error:
        print(f"Failure caused by {error}")


  • on_task_instance_skipped

airflow/example_dags/plugins/event_listener.py[source]

@hookimpl
def on_task_instance_skipped(
    previous_state: TaskInstanceState | None, task_instance: RuntimeTaskInstance | TaskInstance
):
    """
    Called when a task instance skips itself during execution.

    This hook is called only when a task has started execution and then
    intentionally skips itself (e.g., by raising AirflowSkipException).

    Note: This function will NOT cover tasks that were skipped by scheduler, before execution began, such as:
        - Skips due to trigger rules (e.g., upstream failures)
        - Skips from operators like BranchPythonOperator, ShortCircuitOperator, or similar mechanisms
        - Any other situation in which the scheduler decides not to schedule a task for execution

    For comprehensive tracking of skipped tasks, use DAG-level listeners
    (on_dag_run_success/on_dag_run_failed) which may have access to all task states.
    """
    print("Task instance was skipped")

    if isinstance(task_instance, TaskInstance):
        print("Task instance's state was changed through the API.")
        return

    context = task_instance.get_template_context()
    task = context["task"]

    if TYPE_CHECKING:
        assert task

    print("Task start")
    print(f"Task:{task}")


资产事件

  • on_asset_created

  • on_asset_alias_created

  • on_asset_changed

资产事件在执行资产管理操作时发生。

Dag 导入错误事件

  • on_new_dag_import_error

  • on_existing_dag_import_error

Dag 导入错误事件在 Dag 处理器发现 Dag 代码中的导入错误并更新元数据数据库表时触发。

这是一个 实验性特性

用法

创建监听器

  • import airflow.listeners.hookimpl

  • 实现对应事件的 hookimpls,以生成通知

Airflow 将规范定义为 hookspec。您的实现必须接受与 hookspec 中定义的同名参数。如果参数不匹配,Pluggy 会在使用插件时抛出错误。但您并不需要实现所有方法,很多监听器只实现单个方法或方法子集。

要在 Airflow 安装中加入监听器,请把它作为 Airflow 插件的一部分。

Listener API 设计为在所有 Dag 和所有算子之间通用。您不能仅针对特定 Dag 监听事件。若需要针对特定 Dag 或算子实现回调,请使用 on_success_callbackpre_execute 等方法。这些方法为特定 Dag 作者或算子创建者提供回调。日志和 print() 调用会作为监听器的一部分进行处理。

兼容性说明

监听器接口可能随时间演进。我们使用 pluggy 规范,这意味着为旧版接口编写的实现应当能够向前兼容未来的 Airflow 版本。

然而,向后兼容则无法保证。如果您的监听器是针对新接口实现的,则可能无法在旧版本的 Airflow 上工作。若仅针对单一 Airflow 版本使用,这并不是问题,因为您可以针对该版本调整实现。但如果您在编写可能在不同 Airflow 版本间复用的插件或扩展,则需要注意此兼容性。

例如,在 2.10.0 中的 on_task_instance_failed 方法里新增了 error 字段。如果监听器实现没有处理事件对象中缺少该字段的情况,则此监听器只能在 Airflow 2.10.0 及以后版本工作。

若要实现兼容多个 Airflow 版本的监听器(包括使用新版本中新增的特性和字段),应当在代码中检测当前运行的 Airflow 版本,并在新版本环境下使用新版接口实现;在旧版本环境下则使用旧版接口。

例如,如果想在 on_task_instance_failed 中使用 error 字段,可参考以下代码:

from importlib.metadata import version
from packaging.version import Version
from airflow.listeners import hookimpl

airflow_version = Version(version("apache-airflow"))
if airflow_version >= Version("2.10.0"):

    class ClassBasedListener:
        ...

        @hookimpl
        def on_task_instance_failed(self, previous_state, task_instance, error: None | str | BaseException):
            # Handle error case here
            pass

else:

    class ClassBasedListener:  # type: ignore[no-redef]
        ...

        @hookimpl
        def on_task_instance_failed(self, previous_state, task_instance):
            # Handle no error case here
            pass

自 2.8.0 起监听器接口的变更列表(首次引入时)

Airflow 版本

受影响的方法

变更

2.10.0

on_task_instance_failed

在接口中新增了 error 字段

3.0.0

on_task_instance_running

`session` 参数已从任务实例监听器中移除,`task_instance` 对象现在是 `RuntimeTaskInstance` 的实例

3.0.0

`on_task_instance_failed`、`on_task_instance_success`

`session` 参数已从任务实例监听器中移除,`task_instance` 对象在 worker 上为 `RuntimeTaskInstance`,在 API 服务器上为 `TaskInstance`。

3.2.0

on_task_instance_skipped

接口中新增了监听器方法

此条目是否有帮助?