可延迟操作符和触发器¶
标准 操作符 和 传感器 在运行期间会占用一个完整的工作器插槽,即使它们处于空闲状态。例如,如果您只有 100 个工作器插槽可用于运行任务,并且您有 100 个 DAG 正在等待当前正在运行但处于空闲状态的传感器,那么您无法运行任何其他内容 - 即使您的整个 Airflow 集群基本上处于空闲状态。传感器中 reschedule
模式通过允许传感器仅在固定时间间隔内运行来解决此问题,但它不够灵活,并且只允许使用时间作为恢复的原因,而不是其他标准。
这是可以使用可推迟操作符的地方。当操作符除了等待之外无事可做时,它可以通过推迟来暂停自身并释放工作进程以供其他进程使用。当操作符推迟时,执行将移至触发器,操作符指定的触发器将在那里运行。触发器可以执行操作符所需的轮询或等待。然后,当触发器完成轮询或等待时,它将发送一个信号,让操作符恢复执行。在执行的推迟阶段,由于工作已卸载到触发器,因此任务不再占用工作进程槽,并且您将拥有更多空闲工作负载容量。默认情况下,处于推迟状态的任务不占用池槽。如果您希望它们占用池槽,可以通过编辑相关池来更改此设置。
触发器是小型异步 Python 代码,旨在在单个 Python 进程中运行。由于它们是异步的,因此它们都可以有效地共存于触发器 Airflow 组件中。
此流程的工作原理概述
任务实例(正在运行的操作符)达到必须等待其他操作或条件的点,并使用与事件关联的触发器推迟自身以恢复它。这释放了工作进程以运行其他内容。
新的触发器实例由 Airflow 注册,并由触发器进程选取。
触发器运行直到触发,此时其源任务由调度程序重新调度。
调度程序将任务排队,以便在工作进程节点上恢复。
您可以使用预先编写的可推迟操作符作为 DAG 作者,也可以编写自己的可推迟操作符。但是,编写它们要求它们满足某些设计标准。
使用可推迟操作符¶
如果您想使用 Airflow 附带的预先编写的可推迟操作符,例如 TimeSensorAsync
,那么您只需要完成两个步骤
确保您的 Airflow 安装至少运行一个
triggerer
进程,以及正常的scheduler
在您的 DAG 中使用可推迟操作符/传感器
Airflow 会自动为您处理和实现推迟流程。
如果您要升级现有 DAG 以使用可推迟操作符,Airflow 包含 API 兼容的传感器变体,例如 TimeSensorAsync
适用于 TimeSensor
。将这些变体添加到您的 DAG 中,以使用可推迟操作符,而无需进行其他更改。
请注意,您不能在自定义 PythonOperator 或 TaskFlow Python 函数内部使用推迟功能。推迟仅适用于传统的基于类的操作符。
编写可推迟操作符¶
在编写可推迟操作符时,这些是需要考虑的主要要点
您的操作符必须使用触发器推迟自身。您可以使用核心 Airflow 中包含的触发器,也可以编写自定义触发器。
在延迟期间,你的操作员将被停止并从其工作程序中移除,并且不会自动保留任何状态。你可以通过指示 Airflow 在特定方法处恢复操作员或通过传递特定 kwargs 来保留状态。
你可以多次延迟,并且可以在操作员执行重要工作之前或之后延迟。或者,如果满足某些条件,则可以延迟。例如,如果系统没有立即的答案。延迟完全由你控制。
任何操作员都可以延迟;其类中不需要特殊标记,并且不限于传感器。
如果你想添加同时支持可延迟和不可延迟模式的操作员或传感器,建议将
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False)
添加到操作员的__init__
方法中,并使用它来决定是否以可延迟模式运行操作员。你可以通过operator
部分中的default_deferrable
为所有支持在可延迟和不可延迟模式之间切换的操作员和传感器配置deferrable
的默认值。以下是一个同时支持两种模式的传感器的示例。
import time
from datetime import timedelta
from typing import Any
from airflow.configuration import conf
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitOneHourSensor(BaseSensorOperator):
def __init__(
self, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs
) -> None:
super().__init__(**kwargs)
self.deferrable = deferrable
def execute(self, context: Context) -> None:
if self.deferrable:
self.defer(
trigger=TimeDeltaTrigger(timedelta(hours=1)),
method_name="execute_complete",
)
else:
time.sleep(3600)
def execute_complete(
self,
context: Context,
event: dict[str, Any] | None = None,
) -> None:
# We have no more work to do here. Mark as complete.
return
触发延迟¶
如果你想触发延迟,可以在操作员的任何位置调用 self.defer(trigger, method_name, kwargs, timeout)
。这会为 Airflow 引发一个特殊异常。参数为
trigger
:你想要延迟到的触发器的实例。它将被序列化到数据库中。method_name
:操作员上的方法名称,希望 Airflow 在恢复时调用该方法。kwargs
:当调用该方法时要传递给该方法的(可选)其他关键字参数。默认为{}
。超时
:(可选)指定此延迟将失败且任务实例将失败的时间增量。默认为无
,表示无超时。
当您选择延迟时,您的操作员将停止在该点执行并从其当前工作程序中移除。不会保留任何状态,例如 self
上设置的局部变量或属性。当您的操作员恢复时,它将作为其新实例恢复。您可以将状态从操作员的旧实例传递到新实例的唯一方法是使用 方法名称
和 关键字参数
。
当您的操作员恢复时,Airflow 会向传递给 方法名称
方法的关键字参数添加一个 上下文
对象和一个 事件
对象。此 事件
对象包含恢复操作员的触发事件中的有效负载。根据触发器,这可能对您的操作员很有用,例如状态代码或用于获取结果的 URL。或者,它可能是无用的信息,例如日期时间。但是,您的 方法名称
方法必须接受 上下文
和 事件
作为关键字参数。
如果您的操作员从其第一个 execute()
方法(当它是新的时)或由 方法名称
指定的后续方法返回,它将被视为已完成并完成执行。
如果您希望您的操作员有一个入口点,则可以将 方法名称
设置为 execute
,但它还必须接受 事件
作为可选关键字参数。
以下是传感器如何触发延迟的基本示例
from datetime import timedelta
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitOneHourSensor(BaseSensorOperator):
def execute(self, context: Context) -> None:
self.defer(trigger=TimeDeltaTrigger(timedelta(hours=1)), method_name="execute_complete")
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
此传感器只是触发器周围的一个薄包装器。它延迟到触发器,并指定在触发器触发时返回的不同方法。当它立即返回时,它将传感器标记为成功。
self.defer
调用会引发 TaskDeferred
异常,因此它可以在操作员代码的任何位置工作,即使在 execute()
内部的许多嵌套调用中也是如此。您还可以手动引发 TaskDeferred
,它使用与 self.defer
相同的参数。
execution_timeout
在操作符上由总运行时间决定,而不是延迟之间的单独执行。这意味着如果设置了 execution_timeout
,操作符在延迟期间或延迟后运行期间可能会失败,即使它仅恢复了几秒钟。
编写触发器¶
触发器被编写为继承自 BaseTrigger
的类,并实现三个方法
__init__
:一种接收从实例化它的操作符发送过来的参数的方法。run
:一种运行其逻辑并生成一个或多个TriggerEvent
实例的异步方法,作为异步生成器。serialize
:返回重新构建此触发器所需的信息,作为类路径和关键字参数的元组,传递给__init__
。
在编写自己的触发器时,有一些设计约束需要注意
run
方法必须是异步的(使用 Python 的 asyncio),并在执行阻塞操作时正确地await
。run
必须yield
其 TriggerEvent,而不是返回它们。如果它在生成至少一个事件之前返回,Airflow 会将其视为错误,并使等待它的任何任务实例失败。如果它抛出异常,Airflow 也会使任何依赖的任务实例失败。您应该假设触发器实例可以运行多次。如果发生网络分区,并且 Airflow 在分离的机器上重新启动触发器,则可能会发生这种情况。因此,您必须注意副作用。例如,您可能不想使用触发器来插入数据库行。
如果触发器设计为发出多个事件(目前不支持),则每个发出的事件必须包含一个有效载荷,如果触发器在多个位置运行,则可以使用该有效载荷对事件进行去重。如果你只触发一个事件并且不需要将信息传递回操作员,则可以将有效载荷设置为
None
。触发器可以突然从一个触发器服务中移除并在一个新的触发器服务中启动。例如,如果子网发生更改并导致网络分区,或者如果存在部署。如果需要,你可以实现
cleanup
方法,该方法始终在run
之后调用,无论触发器是否正常退出。为了反映对触发器的任何更改,无论何时修改触发器,都需要重新启动触发器。
注意
目前,触发器仅用于其第一个事件,因为它们仅用于恢复延迟任务,并且任务在第一个事件触发后恢复。但是,Airflow 计划允许将来从触发器启动 DAG,这是多事件触发器更有用的地方。
此示例展示了基本触发器的结构,它是 Airflow 的 DateTimeTrigger
的一个非常简化的版本
import asyncio
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils import timezone
class DateTimeTrigger(BaseTrigger):
def __init__(self, moment):
super().__init__()
self.moment = moment
def serialize(self):
return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": self.moment})
async def run(self):
while self.moment > timezone.utcnow():
await asyncio.sleep(1)
yield TriggerEvent(self.moment)
代码示例展示了几件事
__init__
和serialize
被写成一对。当触发器作为其延迟请求的一部分由操作员提交时,触发器会被实例化一次,然后在运行触发器的任何触发器进程上序列化并重新实例化。run
方法被声明为async def
,因为它必须是异步的,并且使用asyncio.sleep
而不是常规的time.sleep
(因为那样会阻塞进程)。当它发出事件时,它会打包
self.moment
,因此,如果此触发器在多个主机上冗余运行,则可以对事件进行去重。
触发器可以像你希望的那样复杂或简单,只要它们满足设计约束。它们可以在高可用性模式下运行,并且在运行触发器的各个主机之间自动分配。我们建议你在触发器中避免任何类型的持久状态。触发器应该从它们的 __init__
中获取它们需要的一切,这样它们就可以被序列化并自由移动。
如果你不熟悉编写异步 Python,在编写 run()
方法时要非常小心。Python 的异步模型意味着如果代码在执行阻塞操作时没有正确地 await
,它可能会阻塞整个进程。Airflow 会尝试检测进程阻塞代码,并在发生这种情况时在触发器日志中发出警告。你可以在编写触发器时设置变量 PYTHONASYNCIODEBUG=1
,以启用 Python 的额外检查,确保你编写的代码是非阻塞的。在进行文件系统调用时要特别小心,因为如果底层文件系统是网络支持的,它可能会阻塞。
触发器中的敏感信息¶
从 Airflow 2.9.0 开始,触发器 kwargs 在存储到数据库之前会被序列化并加密。这意味着你传递给触发器的任何敏感信息都将以加密形式存储在数据库中,并在从数据库中读取时解密。
高可用性¶
触发器被设计为在高可用性 (HA) 架构中工作。如果你想运行高可用性设置,请在多台主机上运行多个 triggerer
副本。与 scheduler
非常相似,它们会自动共存,具有正确的锁定和高可用性。
根据触发器执行的工作量,你可以在单个 triggerer
主机上容纳数百到数万个触发器。默认情况下,每个 triggerer
都有 1000 个触发器容量,它可以尝试一次运行这些触发器。你可以使用 --capacity
参数更改可以同时运行的触发器数量。如果你尝试运行的触发器数量超过所有 triggerer
进程的容量,一些触发器将被延迟运行,直到其他触发器完成。
Airflow 尝试一次只在一个地方运行触发器,并对当前正在运行的所有 触发器
维护一个心跳。如果某个 触发器
终止或与运行 Airflow 数据库的网络断开连接,Airflow 会自动重新安排该主机上的触发器在其他地方运行。Airflow 会等待 (2.1 * 触发器.job_heartbeat_sec
) 秒,以便机器重新出现,然后再重新安排触发器。
这意味着触发器有可能在多个地方同时运行,但这种可能性很小。但是,这种行为设计在触发器契约中,并且是预期行为。当触发器在多个地方同时运行时,Airflow 会对触发的事件进行去重,因此此过程对您的操作员是透明的。
请注意,您运行的每个额外的 触发器
都会导致与您的数据库建立一个额外的持久连接。
传感器中 Mode=’reschedule’ 和 Deferrable=True 之间的差异¶
在 Airflow 中,传感器会等待满足特定条件,然后再继续执行下游任务。传感器有两种管理空闲时段的选项:mode='reschedule'
和 deferrable=True
。由于 mode='reschedule'
是 Airflow 中 BaseSensorOperator 特有的参数,因此如果未满足条件,它允许传感器重新安排自身。 'deferrable=True'
是某些操作员用来表示稍后可以重试(或延迟)任务的约定,但它不是 Airflow 中的内置参数或模式。重试任务的实际行为因特定的操作员实现而异。
mode=’reschedule’ |
deferrable=True |
---|---|
持续重新安排自身,直到满足条件 |
空闲时暂停执行,条件更改时恢复 |
资源使用率较高(重复执行) |
资源使用率较低(空闲时暂停,释放工作器插槽) |
预期条件会随着时间而改变(例如文件创建) |
等待外部事件或资源(例如 API 响应) |
用于重新安排的内置功能 |
需要自定义逻辑来延迟任务并处理外部更改 |