监听器¶
你可以编写监听器,以便 Airflow 在事件发生时通知你。这些监听器由 Pluggy 提供支持。
警告
监听器是 Airflow 的高级特性。它们与运行它们的 Airflow 组件并非隔离,可能会降低 Airflow 实例的性能,甚至在某些情况下导致实例崩溃。因此,编写监听器时应格外小心。
Airflow 支持以下事件的通知
生命周期事件¶
on_starting
before_stopping
生命周期事件允许你对 Airflow Job
(例如 SchedulerJob
)的启动和停止事件作出反应。
DagRun 状态变化事件¶
当 DagRun
改变状态时,会发生 DagRun 状态变化事件。从 Airflow 3 开始,当状态变化通过 API 触发时(对于 on_dag_run_success
和 on_dag_run_failed
),监听器也会收到通知,例如当在 Airflow UI 中将 DagRun 标记为成功时。
on_dag_run_running
src/airflow/example_dags/plugins/event_listener.py
@hookimpl
def on_dag_run_running(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to RUNNING.
"""
print("Dag run in running state")
queued_at = dag_run.queued_at
version = dag_run.version_number
print(f"Dag information Queued at: {queued_at} version: {version}")
on_dag_run_success
src/airflow/example_dags/plugins/event_listener.py
@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to SUCCESS.
"""
print("Dag run in success state")
start_date = dag_run.start_date
end_date = dag_run.end_date
print(f"Dag run start:{start_date} end:{end_date}")
on_dag_run_failed
src/airflow/example_dags/plugins/event_listener.py
@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to FAILED.
"""
print("Dag run in failure state")
dag_id = dag_run.dag_id
run_id = dag_run.run_id
run_type = dag_run.run_type
print(f"Dag information:{dag_id} Run id: {run_id} Run type: {run_type}")
print(f"Failed with message: {msg}")
TaskInstance 状态变化事件¶
当 RuntimeTaskInstance
改变状态时,会发生 TaskInstance 状态变化事件。你可以使用这些事件对 LocalTaskJob
状态变化作出反应。从 Airflow 3 开始,当状态变化通过 API 触发时(对于 on_task_instance_success
和 on_task_instance_failed
),监听器也会收到通知,例如当在 Airflow UI 中将任务实例标记为成功时。在这种情况下,监听器将收到一个 TaskInstance
实例,而不是 RuntimeTaskInstance
实例。
on_task_instance_running
src/airflow/example_dags/plugins/event_listener.py
@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: RuntimeTaskInstance):
"""
Called when task state changes to RUNNING.
previous_task_state and task_instance object can be used to retrieve more information about current
task_instance that is running, its dag_run, task and dag information.
"""
print("Task instance is in running state")
print(" Previous state of the Task instance:", previous_state)
name: str = task_instance.task_id
context = task_instance.get_template_context()
task = context["task"]
if TYPE_CHECKING:
assert task
dag = task.dag
dag_name = None
if dag:
dag_name = dag.dag_id
print(f"Current task name:{name}")
print(f"Dag name:{dag_name}")
on_task_instance_success
src/airflow/example_dags/plugins/event_listener.py
@hookimpl
def on_task_instance_success(
previous_state: TaskInstanceState, task_instance: RuntimeTaskInstance | TaskInstance
):
"""
Called when task state changes to SUCCESS.
previous_task_state and task_instance object can be used to retrieve more information about current
task_instance that has succeeded, its dag_run, task and dag information.
A RuntimeTaskInstance is provided in most cases, except when the task's state change is triggered
through the API. In that case, the TaskInstance available on the API server will be provided instead.
"""
print("Task instance in success state")
print(" Previous state of the Task instance:", previous_state)
if isinstance(task_instance, TaskInstance):
print("Task instance's state was changed through the API.")
print(f"Task operator:{task_instance.operator}")
return
context = task_instance.get_template_context()
operator = context["task"]
print(f"Task operator:{operator}")
on_task_instance_failed
src/airflow/example_dags/plugins/event_listener.py
@hookimpl
def on_task_instance_failed(
previous_state: TaskInstanceState,
task_instance: RuntimeTaskInstance | TaskInstance,
error: None | str | BaseException,
):
"""
Called when task state changes to FAILED.
previous_task_state, task_instance object and error can be used to retrieve more information about current
task_instance that has failed, its dag_run, task and dag information.
A RuntimeTaskInstance is provided in most cases, except when the task's state change is triggered
through the API. In that case, the TaskInstance available on the API server will be provided instead.
"""
print("Task instance in failure state")
if isinstance(task_instance, TaskInstance):
print("Task instance's state was changed through the API.")
print(f"Task operator:{task_instance.operator}")
if error:
print(f"Failure caused by {error}")
return
context = task_instance.get_template_context()
task = context["task"]
if TYPE_CHECKING:
assert task
print("Task start")
print(f"Task:{task}")
if error:
print(f"Failure caused by {error}")
资产事件¶
on_asset_created
on_asset_alias_created
on_asset_changed
当资产管理操作运行时,会发生资产事件。
Dag 导入错误事件¶
on_new_dag_import_error
on_existing_dag_import_error
当 dag 处理器在 Dag 代码中发现导入错误并更新元数据数据库表时,会发生 Dag 导入错误事件。
这是一项实验性功能。
使用方法¶
创建监听器
import
airflow.listeners.hookimpl
实现你想要生成通知的事件的
hookimpls
Airflow 将规范定义为 hookspec。你的实现必须接受与 hookspec 中定义的同名参数。如果你使用的参数与 hookspec 不同,Pluggy 会在你尝试使用插件时抛出错误。但你不需要实现每个方法。许多监听器只实现一个或一部分方法。
要在你的 Airflow 安装中包含监听器,将其作为 Airflow 插件的一部分。
监听器 API 旨在跨所有 dags 和所有操作器调用。你无法监听由特定 dags 生成的事件。对于这种行为,可以尝试使用 on_success_callback
和 pre_execute
等方法。这些方法为特定的 DAG 作者或操作器创建者提供回调。日志和 print()
调用将作为监听器的一部分进行处理。
兼容性说明¶
监听器接口可能会随时间变化。我们使用 pluggy
规范,这意味着为旧版接口编写的监听器实现应该与 Airflow 的未来版本向前兼容。
然而,反之则不保证,因此如果你的监听器是针对较新版本的接口实现的,它可能无法与旧版本的 Airflow 一起工作。如果你的目标是单个 Airflow 版本,这不成问题,因为你可以根据你使用的 Airflow 版本调整实现,但如果你正在编写可与不同版本 Airflow 一起使用的插件或扩展,这一点就很重要。
例如,如果在接口中添加了新字段(如 2.10.0 版本 on_task_instance_failed
方法中的 error
字段),监听器实现将无法处理事件对象中不存在该字段的情况,此类监听器将仅适用于 Airflow 2.10.0 及更高版本。
为了实现一个与多个 Airflow 版本兼容的监听器,包括使用在较新版本的 Airflow 中添加的功能和字段,你应该检查使用的 Airflow 版本,并使用较新版本的接口实现,但对于旧版本的 Airflow,你应该使用旧版本的接口。
例如,如果你想实现一个使用 on_task_instance_failed
中的 error
字段的监听器,你应该使用如下代码
from importlib.metadata import version
from packaging.version import Version
from airflow.listeners import hookimpl
airflow_version = Version(version("apache-airflow"))
if airflow_version >= Version("2.10.0"):
class ClassBasedListener:
...
@hookimpl
def on_task_instance_failed(self, previous_state, task_instance, error: None | str | BaseException):
# Handle error case here
pass
else:
class ClassBasedListener: # type: ignore[no-redef]
...
@hookimpl
def on_task_instance_failed(self, previous_state, task_instance):
# Handle no error case here
pass
自 2.8.0 版本引入监听器接口以来的变更列表
Airflow 版本 |
受影响方法 |
变更 |
---|---|---|
2.10.0 |
|
接口中添加了 |
3.0.0 |
|
从任务实例监听器中移除 |
3.0.0 |
|
从任务实例监听器中移除 |