监听器

您可以编写监听器,以便在事件发生时通知您。这些监听器由 Pluggy 提供支持。

警告

监听器是 Airflow 的高级功能。它们与它们运行所在的 Airflow 组件不是隔离的,并且可能会减慢速度,或者在某些情况下导致您的 Airflow 实例崩溃。因此,在编写监听器时应格外小心。

Airflow 支持以下事件的通知

生命周期事件

  • on_starting

  • before_stopping

生命周期事件允许您对 Airflow 的 Job 的启动和停止事件做出反应,例如 SchedulerJobBackfillJob

DagRun 状态变更事件

DagRun 更改状态时,会发生 DagRun 状态变更事件。

  • on_dag_run_running

airflow/example_dags/plugins/event_listener.py[源码]

@hookimpl
def on_dag_run_running(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to RUNNING.
    """
    print("Dag run  in running state")
    queued_at = dag_run.queued_at
    dag_hash_info = dag_run.dag_hash

    print(f"Dag information Queued at: {queued_at} hash info: {dag_hash_info}")


  • on_dag_run_success

airflow/example_dags/plugins/event_listener.py[源码]

@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to SUCCESS.
    """
    print("Dag run in success state")
    start_date = dag_run.start_date
    end_date = dag_run.end_date

    print(f"Dag run start:{start_date} end:{end_date}")


  • on_dag_run_failed

airflow/example_dags/plugins/event_listener.py[源码]

@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to FAILED.
    """
    print("Dag run  in failure state")
    dag_id = dag_run.dag_id
    run_id = dag_run.run_id
    external_trigger = dag_run.external_trigger

    print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")
    print(f"Failed with message: {msg}")


TaskInstance 状态变更事件

TaskInstance 更改状态时,会发生 TaskInstance 状态变更事件。您可以使用这些事件来响应 LocalTaskJob 的状态更改。

  • on_task_instance_running

airflow/example_dags/plugins/event_listener.py[源码]

@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
    """
    This method is called when task state changes to RUNNING.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that is running its dag_run,
    task and dag information.
    """
    print("Task instance is in running state")
    print(" Previous state of the Task instance:", previous_state)

    state: TaskInstanceState = task_instance.state
    name: str = task_instance.task_id
    start_date = task_instance.start_date

    dagrun = task_instance.dag_run
    dagrun_status = dagrun.state

    task = task_instance.task

    if TYPE_CHECKING:
        assert task

    dag = task.dag
    dag_name = None
    if dag:
        dag_name = dag.dag_id
    print(f"Current task name:{name} state:{state} start_date:{start_date}")
    print(f"Dag name:{dag_name} and current dag run status:{dagrun_status}")


  • on_task_instance_success

airflow/example_dags/plugins/event_listener.py[源码]

@hookimpl
def on_task_instance_success(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
    """
    This method is called when task state changes to SUCCESS.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that has succeeded its
    dag_run, task and dag information.
    """
    print("Task instance in success state")
    print(" Previous state of the Task instance:", previous_state)

    dag_id = task_instance.dag_id
    hostname = task_instance.hostname
    operator = task_instance.operator

    dagrun = task_instance.dag_run
    queued_at = dagrun.queued_at
    print(f"Dag name:{dag_id} queued_at:{queued_at}")
    print(f"Task hostname:{hostname} operator:{operator}")


  • on_task_instance_failed

airflow/example_dags/plugins/event_listener.py[源码]

@hookimpl
def on_task_instance_failed(
    previous_state: TaskInstanceState, task_instance: TaskInstance, error: None | str | BaseException, session
):
    """
    This method is called when task state changes to FAILED.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that has failed its dag_run,
    task and dag information.
    """
    print("Task instance in failure state")

    start_date = task_instance.start_date
    end_date = task_instance.end_date
    duration = task_instance.duration

    dagrun = task_instance.dag_run

    task = task_instance.task

    if TYPE_CHECKING:
        assert task

    dag = task.dag

    print(f"Task start:{start_date} end:{end_date} duration:{duration}")
    print(f"Task:{task} dag:{dag} dagrun:{dagrun}")
    if error:
        print(f"Failure caused by {error}")


数据集事件

  • on_dataset_created

  • on_dataset_changed

当运行数据集管理操作时,会发生数据集事件。

Dag 导入错误事件

  • on_new_dag_import_error

  • on_existing_dag_import_error

当 dag 处理器在 Dag 代码中发现导入错误并更新元数据数据库表时,会发生 Dag 导入错误事件。

这是一个实验性功能

用法

要创建监听器

  • import airflow.listeners.hookimpl

  • 为要生成通知的事件实现 hookimpls

Airflow 将规范定义为 hookspec。您的实现必须接受与 hookspec 中定义的相同命名参数。如果您不使用与 hookspec 相同的参数,则当您尝试使用您的插件时,Pluggy 会抛出错误。但是您不需要实现每个方法。许多监听器只实现一个方法或方法的一个子集。

要将监听器包含在您的 Airflow 安装中,请将其作为 Airflow 插件 的一部分包含进来。

监听器 API 旨在跨所有 DAG 和所有操作符调用。您无法监听特定 DAG 生成的事件。对于该行为,请尝试 on_success_callbackpre_execute 等方法。这些为特定 DAG 作者或操作符创建者提供回调。日志和 print() 调用将作为监听器的一部分进行处理。

兼容性说明

监听器接口可能会随着时间的推移而更改。我们正在使用 pluggy 规范,这意味着为较旧版本的接口编写的监听器的实现应与未来版本的 Airflow 向前兼容。

但是,不能保证相反的情况,因此如果您的监听器是针对较新版本的接口实现的,则它可能无法与较旧版本的 Airflow 一起使用。如果您以单个版本的 Airflow 为目标,则这不是问题,因为您可以调整您的实现以适应您使用的 Airflow 版本,但如果您编写的插件或扩展可能与不同版本的 Airflow 一起使用,则这一点很重要。

例如,如果在接口中添加了新字段(例如 2.10.0 中 on_task_instance_failed 方法中的 error 字段),则监听器实现将无法处理事件对象中不存在该字段的情况,并且此类监听器仅适用于 Airflow 2.10.0 及更高版本。

为了实现与多个版本的 Airflow 兼容的监听器,包括使用较新版本的 Airflow 中添加的功能和字段,您应该检查使用的 Airflow 版本并使用较新版本的接口实现,但对于较旧版本的 Airflow,您应该使用较旧版本的接口。

例如,如果您想实现一个使用 on_task_instance_failed 中的 error 字段的监听器,则应使用如下代码

from importlib.metadata import version
from packaging.version import Version
from airflow.listeners import hookimpl

airflow_version = Version(version("apache-airflow"))
if airflow_version >= Version("2.10.0"):

    class ClassBasedListener:
        ...

        @hookimpl
        def on_task_instance_failed(
            self, previous_state, task_instance, error: None | str | BaseException, session
        ):
            # Handle error case here
            pass

else:

    class ClassBasedListener:  # type: ignore[no-redef]
        ...

        @hookimpl
        def on_task_instance_failed(self, previous_state, task_instance, session):
            # Handle no error case here
            pass

自 2.8.0 引入以来,监听器接口中的更改列表

Airflow 版本

受影响的方法

更改

2.10.0

on_task_instance_failed

向接口添加了错误字段

此条目是否对您有帮助?