Airflow 峰会 2025 将于 10 月 07-09 日举行。立即注册获取早鸟票!

Airflow 的监听器插件

Airflow 具有通过插件添加监听器来监控和跟踪任务状态的功能。

这是一个简单的 Airflow 监听器插件示例,它有助于跟踪任务状态,并收集有关任务、DAG 运行和 DAG 的有用元数据信息。

这是一个 Airflow 插件示例,用于创建 Airflow 监听器插件。此插件通过使用 SQLAlchemy 的事件机制工作。它在表级别监视任务实例状态的变化并触发事件。这将通知所有 DAG 中所有任务的更改。

在此插件中,对象引用派生自基类 airflow.plugins_manager.AirflowPlugin

监听器插件底层使用了 pluggy 应用。Pluggy 是一个为 Pytest 构建的插件管理和 hook 调用应用。Pluggy 支持函数 hooking,因此它允许构建具有您自己定制 hooking 的“可插拔”系统。

使用此插件,可以监听以下事件:
  • 任务实例处于运行状态。

  • 任务实例处于成功状态。

  • 任务实例处于失败状态。

  • DAG 运行处于运行状态。

  • DAG 运行处于成功状态。

  • DAG 运行处于失败状态。

  • 在 Airflow 作业、调度器等事件开始之前

  • 在 Airflow 作业、调度器等事件停止之前

监听器注册

具有监听器对象引用的监听器插件作为 Airflow 插件的一部分注册。以下是实现新监听器的骨架:

from airflow.plugins_manager import AirflowPlugin

# This is the listener file created where custom code to monitor is added over hookimpl
import listener


class MetadataCollectionPlugin(AirflowPlugin):
    name = "MetadataCollectionPlugin"
    listeners = [listener]

接下来,我们可以查看添加到 listener 中的代码,并查看每个监听器的实现方法。实现完成后,监听器部分将在所有 DAG 的所有任务执行期间执行。

作为参考,这是 listener.py 类中的插件代码,显示了数据库中的表列表:

此示例监听任务实例处于运行状态时的事件。

src/airflow/example_dags/plugins/event_listener.py

@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: RuntimeTaskInstance):
    """
    Called when task state changes to RUNNING.

    previous_task_state and task_instance object can be used to retrieve more information about current
    task_instance that is running, its dag_run, task and dag information.
    """
    print("Task instance is in running state")
    print(" Previous state of the Task instance:", previous_state)

    name: str = task_instance.task_id

    context = task_instance.get_template_context()

    task = context["task"]

    if TYPE_CHECKING:
        assert task

    dag = task.dag
    dag_name = None
    if dag:
        dag_name = dag.dag_id
    print(f"Current task name:{name}")
    print(f"Dag name:{dag_name}")


类似地,可以实现监听任务实例成功和失败后的代码。

此示例监听 DAG 运行更改为失败状态时的事件。

src/airflow/example_dags/plugins/event_listener.py

@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to FAILED.
    """
    print("Dag run  in failure state")
    dag_id = dag_run.dag_id
    run_id = dag_run.run_id
    run_type = dag_run.run_type

    print(f"Dag information:{dag_id} Run id: {run_id} Run type: {run_type}")
    print(f"Failed with message: {msg}")


类似地,可以实现监听 DAG 运行成功后和运行状态期间的代码。

添加监听器实现所需的监听器插件文件作为 Airflow 插件的一部分添加到 $AIRFLOW_HOME/plugins/ 文件夹中,并在 Airflow 启动时加载。

此条目有帮助吗?