可延迟操作符 & 触发器¶
标准的 操作符 和 传感器 在整个运行期间都会占用一个完整的工作槽,即使它们处于空闲状态。例如,如果您只有 100 个工作槽可用于运行任务,并且有 100 个 DAG 等待当前正在运行但空闲的传感器,那么您无法运行其他任何内容 - 即使您的整个 Airflow 集群基本上处于空闲状态。reschedule
传感器模式通过允许传感器仅以固定的时间间隔运行来解决其中的一些问题,但这不灵活,并且仅允许使用时间作为恢复的原因,而不是其他条件。
这就是可以使用可延迟操作符的地方。当操作符无事可做只能等待时,它可以通过延迟来暂停自身并释放工作槽以用于其他进程。当操作符延迟时,执行会移动到触发器,在那里将运行操作符指定的触发器。触发器可以执行操作符所需的轮询或等待。然后,当触发器完成轮询或等待时,它会发送信号让操作符恢复执行。在执行的延迟阶段,由于工作已卸载到触发器,因此任务不再占用工作槽,并且您有更多可用的工作负载容量。默认情况下,处于延迟状态的任务不占用池槽。如果您希望它们占用,可以通过编辑相关池来更改此设置。
触发器是小的、异步的 Python 代码片段,旨在在单个 Python 进程中运行。由于它们是异步的,因此它们可以高效地共存于 Airflow 组件的触发器中。
此过程如何工作的概述
任务实例(正在运行的操作符)到达必须等待其他操作或条件的位置,并使用与恢复它的事件相关的触发器来延迟自身。这会释放工作器以运行其他内容。
新的触发器实例由 Airflow 注册,并由触发器进程拾取。
触发器运行直到触发,此时其源任务由调度程序重新调度。
调度程序将任务排队以便在工作节点上恢复。
您可以像 DAG 作者一样使用预先编写的可延迟操作符,也可以编写自己的操作符。但是,编写它们需要满足某些设计标准。
使用可延迟操作符¶
如果您想使用 Airflow 自带的预先编写的可延迟操作符,例如 TimeSensorAsync
,那么您只需要完成两个步骤
确保您的 Airflow 安装至少运行一个
触发器
进程,以及正常的调度器
在您的 DAG 中使用可延迟操作符/传感器
Airflow 会自动处理和实现您的延迟过程。
如果您要升级现有 DAG 以使用可延迟操作符,Airflow 包含 API 兼容的传感器变体,例如 TimeSensorAsync
用于 TimeSensor
。将这些变体添加到您的 DAG 中即可使用可延迟操作符,无需其他更改。
请注意,您不能从自定义 PythonOperator 或 TaskFlow Python 函数内部使用延迟功能。延迟仅适用于传统的基于类的操作符。
编写可延迟操作符¶
编写可延迟操作符时,以下是需要考虑的要点
您的操作符必须使用触发器延迟自身。您可以使用 Airflow 核心中包含的触发器,也可以编写自定义触发器。
您的操作符在延迟时将被停止并从其工作器中删除,并且不会自动保留状态。您可以通过指示 Airflow 在特定方法处恢复操作符或传递某些 kwargs 来保留状态。
您可以多次延迟,并且可以在操作符执行重要工作之前或之后延迟。或者,您可以在满足某些条件时延迟。例如,如果系统没有立即的答案。延迟完全由您控制。
任何操作符都可以延迟;无需在其类上进行特殊标记,并且不限于传感器。
如果要添加支持可延迟和不可延迟模式的操作符或传感器,建议添加
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False)
到操作符的__init__
方法,并使用它来决定是否在可延迟模式下运行操作符。您可以通过operator
部分中的default_deferrable
配置所有支持在可延迟和不可延迟模式之间切换的操作符和传感器的deferrable
的默认值。以下是一个支持两种模式的传感器的示例。
import time
from datetime import timedelta
from typing import Any
from airflow.configuration import conf
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitOneHourSensor(BaseSensorOperator):
def __init__(
self, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs
) -> None:
super().__init__(**kwargs)
self.deferrable = deferrable
def execute(self, context: Context) -> None:
if self.deferrable:
self.defer(
trigger=TimeDeltaTrigger(timedelta(hours=1)),
method_name="execute_complete",
)
else:
time.sleep(3600)
def execute_complete(
self,
context: Context,
event: dict[str, Any] | None = None,
) -> None:
# We have no more work to do here. Mark as complete.
return
编写触发器¶
触发器被编写为一个继承自 BaseTrigger
的类,并实现三个方法
__init__
: 一个接收来自实例化它的操作符的参数的方法。从 2.10.0 开始,我们能够直接从预定义的触发器启动任务执行。要利用此功能,__init__
中的所有参数都必须是可序列化的。run
: 一个异步方法,它运行其逻辑并产生一个或多个TriggerEvent
实例作为异步生成器。serialize
: 返回重建此触发器所需的信息,作为类路径的元组,以及传递给__init__
的关键字参数。
此示例显示了一个基本触发器的结构,它是 Airflow 的 DateTimeTrigger
的一个非常简化的版本
import asyncio
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils import timezone
class DateTimeTrigger(BaseTrigger):
def __init__(self, moment):
super().__init__()
self.moment = moment
def serialize(self):
return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": self.moment})
async def run(self):
while self.moment > timezone.utcnow():
await asyncio.sleep(1)
yield TriggerEvent(self.moment)
代码示例显示了几件事
__init__
和serialize
作为一对编写。触发器在操作符作为其延迟请求的一部分提交时实例化一次,然后序列化并在运行触发器的任何触发器进程上重新实例化。run
方法被声明为async def
,因为它必须是异步的,并且使用asyncio.sleep
而不是常规的time.sleep
(因为这会阻塞进程)。当它发出事件时,它会将
self.moment
打包到那里,因此如果此触发器在多个主机上冗余运行,则可以对事件进行重复数据删除。
只要它们满足设计约束,触发器可以像您想要的那样复杂或简单。它们可以以高可用性的方式运行,并且在运行触发器的主机之间自动分配。我们鼓励您避免在触发器中使用任何类型的持久状态。触发器应该从它们的 __init__
中获取它们需要的所有内容,以便它们可以被序列化并自由移动。
如果您是异步 Python 新手,编写 run()
方法时要格外小心。Python 的异步模型意味着,如果代码在执行阻塞操作时没有正确使用 await
,则可能阻塞整个进程。Airflow 会尝试检测阻塞进程的代码,并在发生阻塞时在触发器日志中发出警告。您可以通过在编写触发器时设置变量 PYTHONASYNCIODEBUG=1
来启用额外的 Python 检查,以确保您编写的是非阻塞代码。执行文件系统调用时要格外小心,因为如果底层文件系统是基于网络的,则可能会发生阻塞。
编写自己的触发器时,需要注意一些设计约束:
run
方法必须是异步的(使用 Python 的 asyncio),并且在执行阻塞操作时正确使用await
。run
必须yield
其 TriggerEvents,而不是返回它们。如果它在至少产生一个事件之前就返回,Airflow 会将其视为错误,并使任何等待它的任务实例失败。如果它抛出异常,Airflow 也会使任何依赖的任务实例失败。您应该假设一个触发器实例可以运行多次。如果发生网络分区并且 Airflow 在分离的机器上重新启动触发器,则可能会发生这种情况。因此,您必须注意副作用。例如,您可能不想使用触发器来插入数据库行。
如果您的触发器设计为发出多个事件(目前不支持),则每个发出的事件必须包含一个有效负载,如果触发器在多个位置运行,该有效负载可用于对事件进行重复数据删除。如果您只触发一个事件并且不需要将信息传递回运算符,则只需将有效负载设置为
None
。触发器可能会突然从一个触发器服务中移除,并在一个新的服务上启动。例如,如果子网发生更改,导致网络分区,或者发生部署。如果需要,您可以实现
cleanup
方法,该方法始终在run
之后调用,无论触发器是正常退出还是其他情况。为了使对触发器的任何更改生效,每当修改触发器时,都需要重新启动触发器。
注意
目前,触发器仅使用到其第一个事件,因为它们仅用于恢复延迟的任务,并且任务在第一个事件触发后恢复。但是,Airflow 计划在未来允许从触发器启动 DAG,这将使多事件触发器更有用。
触发器中的敏感信息¶
自 Airflow 2.9.0 起,触发器 kwargs 在存储到数据库之前会被序列化和加密。这意味着您传递给触发器的任何敏感信息都将以加密形式存储在数据库中,并在从数据库读取时解密。
触发延迟¶
如果您想在运算符中的任何位置触发延迟,可以调用 self.defer(trigger, method_name, kwargs, timeout)
。这会为 Airflow 引发一个特殊异常。参数如下:
trigger
:您要延迟到的触发器的实例。它将被序列化到数据库中。method_name
:您希望 Airflow 在恢复时调用的运算符上的方法名称。kwargs
:(可选)当调用该方法时要传递的其他关键字参数。默认为{}
。timeout
:(可选)一个 timedelta,指定此延迟失败的超时时间,并使任务实例失败。默认为None
,表示没有超时。
以下是一个传感器如何触发延迟的基本示例:
from datetime import timedelta
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitOneHourSensor(BaseSensorOperator):
def execute(self, context: Context) -> None:
self.defer(trigger=TimeDeltaTrigger(timedelta(hours=1)), method_name="execute_complete")
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
当您选择延迟时,您的运算符将停止在该点执行,并从其当前工作进程中删除。不会持久化任何状态,例如局部变量或在 self
上设置的属性。当您的运算符恢复时,它会作为它的新实例恢复。您可以将状态从运算符的旧实例传递到新实例的唯一方法是使用 method_name
和 kwargs
。
当您的运算符恢复时,Airflow 会将 context
对象和一个 event
对象添加到传递给 method_name
方法的 kwargs 中。这个 event
对象包含恢复您的运算符的触发器事件中的有效负载。根据触发器,这对您的运算符可能很有用,例如状态代码或用于获取结果的 URL。或者,它可能是无关紧要的信息,例如日期时间。但是,您的 method_name
方法必须接受 context
和 event
作为关键字参数。
如果您的运算符从其第一个 execute()
方法(当它是新的时)或由 method_name
指定的后续方法返回,则它将被视为已完成并完成执行。
如果您希望运算符只有一个入口点,则可以将 method_name
设置为 execute
,但它还必须接受 event
作为可选关键字参数。
让我们深入了解上面的 WaitOneHourSensor
示例。这个传感器只是触发器的一个薄包装。它延迟到触发器,并指定触发器触发时返回的不同方法。当它立即返回时,它会将传感器标记为成功。
self.defer
调用会引发 TaskDeferred
异常,因此它可以在您的运算符代码中的任何位置工作,即使嵌套在 execute()
内的许多调用中。您也可以手动引发 TaskDeferred
,它使用与 self.defer
相同的参数。
运算符上的 execution_timeout
是根据总运行时间确定的,而不是延迟之间的单独执行。这意味着如果设置了 execution_timeout
,则运算符可能会在延迟期间或在延迟后运行期间失败,即使它只恢复了几秒钟。
从任务启动触发延迟¶
2.10.0 版本新增。
如果您想直接将任务延迟到触发器,而无需进入工作进程,可以将类级别属性 start_from_trigger
设置为 True
,并为您的可延迟运算符添加一个具有以下 4 个属性的 StartTriggerArgs
对象的类级别属性 start_trigger_args
:
trigger_cls
:指向您的触发器类的可导入路径。trigger_kwargs
:初始化trigger_cls
时要传递的关键字参数。请注意,所有参数都需要可序列化。这是此功能的主要限制。next_method
:您希望 Airflow 在恢复时调用的运算符上的方法名称。next_kwargs
:调用next_method
时要传递的其他关键字参数。timeout
:(可选)一个 timedelta,指定此延迟失败的超时时间,并使任务实例失败。默认为None
,表示没有超时。
当延迟是 execute
方法唯一的操作时,这尤其有用。以下是前面示例的基本改进。在前面的示例中,我们使用了 DateTimeTrigger
,它接受一个类型为 datetime.timedelta
的参数 delta
,该参数不可序列化。因此,我们需要创建一个具有可序列化参数的新触发器。
from __future__ import annotations
import datetime
from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone
class HourDeltaTrigger(DateTimeTrigger):
def __init__(self, hours: int):
moment = timezone.utcnow() + datetime.timedelta(hours=hours)
super().__init__(moment=moment)
在传感器部分,我们需要提供 HourDeltaTrigger
的路径作为 trigger_cls
。
from __future__ import annotations
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs
from airflow.utils.context import Context
class WaitOneHourSensor(BaseSensorOperator):
# You'll need to change trigger_cls to the actual path to HourDeltaTrigger.
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.HourDeltaTrigger",
trigger_kwargs={"hours": 1},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
start_from_trigger = True
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
也可以在实例级别修改 start_from_trigger
和 trigger_kwargs
,以实现更灵活的配置。
from datetime import timedelta
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitTwoHourSensor(BaseSensorOperator):
# You'll need to change trigger_cls to the actual path to HourDeltaTrigger.
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.HourDeltaTrigger",
trigger_kwargs={"hours": 1},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
def __init__(self, *args: list[Any], **kwargs: dict[str, Any]) -> None:
super().__init__(*args, **kwargs)
self.start_trigger_args.trigger_kwargs = {"hours": 2}
self.start_from_trigger = True
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
映射任务的初始化阶段发生在调度器将其提交给执行器之后。因此,此功能提供的动态任务映射支持有限,其用法与标准实践不同。要启用动态任务映射支持,您需要在 __init__
方法中定义 start_from_trigger
和 trigger_kwargs
。请注意,您无需同时定义这两个参数即可使用此功能,但您需要使用完全相同的参数名称。例如,如果您将一个参数定义为 t_kwargs
并将此值赋给 self.start_trigger_args.trigger_kwargs
,则它不会有任何效果。当映射一个 start_from_trigger
设置为 True 的任务时,将跳过整个 __init__
方法。调度器将使用来自 partial
和 expand
的提供的 start_from_trigger
和 trigger_kwargs
(如果未提供,则回退到类属性中的值),以确定是否以及如何将任务提交给执行器或触发器。请注意,XCom 值在此阶段不会被解析。
触发器执行完成后,任务可能会被发回给 worker 执行 next_method
,或者任务实例可能会直接结束。(请参阅 从触发器退出延迟任务)如果任务被发回给 worker,则 __init__
方法中的参数仍然会在 next_method
执行之前生效,但它们不会影响触发器的执行。
from datetime import timedelta
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitHoursSensor(BaseSensorOperator):
# You'll need to change trigger_cls to the actual path to HourDeltaTrigger.
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.triggers.temporal.HourDeltaTrigger",
trigger_kwargs={"hours": 1},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
def __init__(
self,
*args: list[Any],
trigger_kwargs: dict[str, Any] | None,
start_from_trigger: bool,
**kwargs: dict[str, Any],
) -> None:
# This whole method will be skipped during dynamic task mapping.
super().__init__(*args, **kwargs)
self.start_trigger_args.trigger_kwargs = trigger_kwargs
self.start_from_trigger = start_from_trigger
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return
这将扩展为 2 个任务,它们的“hours”参数分别设置为 1 和 2。
WaitHoursSensor.partial(task_id="wait_for_n_hours", start_from_trigger=True).expand(
trigger_kwargs=[{"hours": 1}, {"hours": 2}]
)
从触发器退出延迟任务¶
2.10.0 版本新增。
如果您想直接从触发器中退出任务,而无需进入 worker,您可以指定实例级别的属性 end_from_trigger
以及您的可延迟操作符的属性,如上所述。这可以节省启动新 worker 所需的一些资源。
触发器可以有两种选择:它们可以将执行发回给 worker,或者直接结束任务实例。如果触发器本身结束任务实例,则 method_name
无关紧要,可以为 None
。否则,请提供在任务中恢复执行时应使用的 method_name
。
class WaitFiveHourSensorAsync(BaseSensorOperator):
# this sensor always exits from trigger.
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.end_from_trigger = True
def execute(self, context: Context) -> NoReturn:
self.defer(
method_name=None,
trigger=WaitFiveHourTrigger(duration=timedelta(hours=5), end_from_trigger=self.end_from_trigger),
)
TaskSuccessEvent
和 TaskFailureEvent
是可用于直接结束任务实例的两个事件。这会将任务标记为状态 task_instance_state
,并可选择性地推送 xcom(如果适用)。以下是如何使用这些事件的示例
class WaitFiveHourTrigger(BaseTrigger):
def __init__(self, duration: timedelta, *, end_from_trigger: bool = False):
super().__init__()
self.duration = duration
self.end_from_trigger = end_from_trigger
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"your_module.WaitFiveHourTrigger",
{"duration": self.duration, "end_from_trigger": self.end_from_trigger},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
await asyncio.sleep(self.duration.total_seconds())
if self.end_from_trigger:
yield TaskSuccessEvent()
else:
yield TriggerEvent({"duration": self.duration})
在上面的示例中,如果 end_from_trigger
设置为 True
,则触发器将通过产生 TaskSuccessEvent
直接结束任务实例。否则,它将使用操作符中指定的方法恢复任务实例。
注意
仅当没有为可延迟操作符集成监听器时,从触发器退出才有效。目前,当可延迟操作符的 end_from_trigger
属性设置为 True
并且集成了监听器时,在解析期间会引发异常,以指示此限制。在编写自定义触发器时,请确保如果从插件添加了监听器,则触发器不会设置为直接结束任务实例。如果触发器的作者将 end_from_trigger
属性更改为不同的属性,则 DAG 解析不会引发任何异常,并且依赖于此任务的监听器将无法工作。此限制将在未来的版本中解决。
高可用性¶
触发器设计为在高可用性 (HA) 架构中工作。如果您想运行高可用性设置,请在多个主机上运行多个 triggerer
副本。与 scheduler
非常相似,它们会自动通过正确的锁定和 HA 共存。
根据触发器执行的工作量,您可以在单个 triggerer
主机上容纳数百到数万个触发器。默认情况下,每个 triggerer
的容量为 1000 个,它可以尝试同时运行这些触发器。您可以使用 --capacity
参数更改可以同时运行的触发器数量。如果您尝试运行的触发器数量超过了所有 triggerer
进程的容量,则某些触发器的运行将被延迟,直到其他触发器完成。
Airflow 尝试一次只在一个地方运行触发器,并保持对当前正在运行的所有 triggerer
的心跳。如果 triggerer
死机,或者与运行 Airflow 数据库的网络断开连接,Airflow 会自动重新安排该主机上的触发器在其他地方运行。Airflow 会等待(2.1 * triggerer.job_heartbeat_sec
)秒,让机器重新出现,然后再重新安排触发器。
这意味着触发器有可能(但不常见)在多个地方同时运行。但是,此行为已设计到触发器约定中,并且是预期行为。当触发器在多个地方同时运行时,Airflow 会删除重复的事件,因此此过程对您的操作员是透明的。
请注意,您运行的每个额外的 triggerer
都会导致与数据库建立额外的持久连接。
传感器中 Mode='reschedule' 和 Deferrable=True 的区别¶
在 Airflow 中,传感器会等待满足特定条件,然后才继续执行下游任务。传感器有两种管理空闲期的方式:mode='reschedule'
和 deferrable=True
。由于 mode='reschedule'
是 Airflow 中 BaseSensorOperator 特有的参数,因此如果未满足条件,它允许传感器重新安排自身。'deferrable=True'
是某些操作符使用的一种约定,表示该任务可以稍后重试(或延迟),但它不是 Airflow 中的内置参数或模式。重试任务的实际行为因具体操作符实现而异。
mode='reschedule' |
deferrable=True |
---|---|
持续重新安排自身,直到满足条件 |
空闲时暂停执行,条件更改时恢复执行 |
资源使用率较高(重复执行) |
资源使用率较低(空闲时暂停,释放 worker 插槽) |
预期条件会随着时间变化(例如,文件创建) |
等待外部事件或资源(例如,API 响应) |
用于重新安排的内置功能 |
需要自定义逻辑来延迟任务并处理外部更改 |