监听器¶
您可以编写监听器,以便在事件发生时通知您。这些监听器由 Pluggy 提供支持。
警告
监听器是 Airflow 的高级功能。它们与它们运行所在的 Airflow 组件不是隔离的,并且可能会减慢速度,或者在某些情况下导致您的 Airflow 实例崩溃。因此,在编写监听器时应格外小心。
Airflow 支持以下事件的通知
生命周期事件¶
on_starting
before_stopping
生命周期事件允许您对 Airflow 的 Job
的启动和停止事件做出反应,例如 SchedulerJob
或 BackfillJob
。
DagRun 状态变更事件¶
当 DagRun
更改状态时,会发生 DagRun 状态变更事件。
on_dag_run_running
@hookimpl
def on_dag_run_running(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to RUNNING.
"""
print("Dag run in running state")
queued_at = dag_run.queued_at
dag_hash_info = dag_run.dag_hash
print(f"Dag information Queued at: {queued_at} hash info: {dag_hash_info}")
on_dag_run_success
@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to SUCCESS.
"""
print("Dag run in success state")
start_date = dag_run.start_date
end_date = dag_run.end_date
print(f"Dag run start:{start_date} end:{end_date}")
on_dag_run_failed
@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to FAILED.
"""
print("Dag run in failure state")
dag_id = dag_run.dag_id
run_id = dag_run.run_id
external_trigger = dag_run.external_trigger
print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")
print(f"Failed with message: {msg}")
TaskInstance 状态变更事件¶
当 TaskInstance
更改状态时,会发生 TaskInstance 状态变更事件。您可以使用这些事件来响应 LocalTaskJob
的状态更改。
on_task_instance_running
@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
"""
This method is called when task state changes to RUNNING.
Through callback, parameters like previous_task_state, task_instance object can be accessed.
This will give more information about current task_instance that is running its dag_run,
task and dag information.
"""
print("Task instance is in running state")
print(" Previous state of the Task instance:", previous_state)
state: TaskInstanceState = task_instance.state
name: str = task_instance.task_id
start_date = task_instance.start_date
dagrun = task_instance.dag_run
dagrun_status = dagrun.state
task = task_instance.task
if TYPE_CHECKING:
assert task
dag = task.dag
dag_name = None
if dag:
dag_name = dag.dag_id
print(f"Current task name:{name} state:{state} start_date:{start_date}")
print(f"Dag name:{dag_name} and current dag run status:{dagrun_status}")
on_task_instance_success
@hookimpl
def on_task_instance_success(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
"""
This method is called when task state changes to SUCCESS.
Through callback, parameters like previous_task_state, task_instance object can be accessed.
This will give more information about current task_instance that has succeeded its
dag_run, task and dag information.
"""
print("Task instance in success state")
print(" Previous state of the Task instance:", previous_state)
dag_id = task_instance.dag_id
hostname = task_instance.hostname
operator = task_instance.operator
dagrun = task_instance.dag_run
queued_at = dagrun.queued_at
print(f"Dag name:{dag_id} queued_at:{queued_at}")
print(f"Task hostname:{hostname} operator:{operator}")
on_task_instance_failed
@hookimpl
def on_task_instance_failed(
previous_state: TaskInstanceState, task_instance: TaskInstance, error: None | str | BaseException, session
):
"""
This method is called when task state changes to FAILED.
Through callback, parameters like previous_task_state, task_instance object can be accessed.
This will give more information about current task_instance that has failed its dag_run,
task and dag information.
"""
print("Task instance in failure state")
start_date = task_instance.start_date
end_date = task_instance.end_date
duration = task_instance.duration
dagrun = task_instance.dag_run
task = task_instance.task
if TYPE_CHECKING:
assert task
dag = task.dag
print(f"Task start:{start_date} end:{end_date} duration:{duration}")
print(f"Task:{task} dag:{dag} dagrun:{dagrun}")
if error:
print(f"Failure caused by {error}")
Dag 导入错误事件¶
on_new_dag_import_error
on_existing_dag_import_error
当 dag 处理器在 Dag 代码中发现导入错误并更新元数据数据库表时,会发生 Dag 导入错误事件。
这是一个实验性功能。
用法¶
要创建监听器
import
airflow.listeners.hookimpl
为要生成通知的事件实现
hookimpls
Airflow 将规范定义为 hookspec。您的实现必须接受与 hookspec 中定义的相同命名参数。如果您不使用与 hookspec 相同的参数,则当您尝试使用您的插件时,Pluggy 会抛出错误。但是您不需要实现每个方法。许多监听器只实现一个方法或方法的一个子集。
要将监听器包含在您的 Airflow 安装中,请将其作为 Airflow 插件 的一部分包含进来。
监听器 API 旨在跨所有 DAG 和所有操作符调用。您无法监听特定 DAG 生成的事件。对于该行为,请尝试 on_success_callback
和 pre_execute
等方法。这些为特定 DAG 作者或操作符创建者提供回调。日志和 print()
调用将作为监听器的一部分进行处理。
兼容性说明¶
监听器接口可能会随着时间的推移而更改。我们正在使用 pluggy
规范,这意味着为较旧版本的接口编写的监听器的实现应与未来版本的 Airflow 向前兼容。
但是,不能保证相反的情况,因此如果您的监听器是针对较新版本的接口实现的,则它可能无法与较旧版本的 Airflow 一起使用。如果您以单个版本的 Airflow 为目标,则这不是问题,因为您可以调整您的实现以适应您使用的 Airflow 版本,但如果您编写的插件或扩展可能与不同版本的 Airflow 一起使用,则这一点很重要。
例如,如果在接口中添加了新字段(例如 2.10.0 中 on_task_instance_failed
方法中的 error
字段),则监听器实现将无法处理事件对象中不存在该字段的情况,并且此类监听器仅适用于 Airflow 2.10.0 及更高版本。
为了实现与多个版本的 Airflow 兼容的监听器,包括使用较新版本的 Airflow 中添加的功能和字段,您应该检查使用的 Airflow 版本并使用较新版本的接口实现,但对于较旧版本的 Airflow,您应该使用较旧版本的接口。
例如,如果您想实现一个使用 on_task_instance_failed
中的 error
字段的监听器,则应使用如下代码
from importlib.metadata import version
from packaging.version import Version
from airflow.listeners import hookimpl
airflow_version = Version(version("apache-airflow"))
if airflow_version >= Version("2.10.0"):
class ClassBasedListener:
...
@hookimpl
def on_task_instance_failed(
self, previous_state, task_instance, error: None | str | BaseException, session
):
# Handle error case here
pass
else:
class ClassBasedListener: # type: ignore[no-redef]
...
@hookimpl
def on_task_instance_failed(self, previous_state, task_instance, session):
# Handle no error case here
pass
自 2.8.0 引入以来,监听器接口中的更改列表
Airflow 版本 |
受影响的方法 |
更改 |
---|---|---|
2.10.0 |
|
向接口添加了错误字段 |