传感器

传感器是一种特殊类型的 Operator,旨在只做一件事——等待某件事发生。它可以基于时间,或等待文件,或外部事件,但它们的唯一工作就是等待直到某事发生,然后 成功,使其下游任务能够运行。或者,如果该事件在配置的超时时间内未发生,失败,从而通过常规机制提醒你出现故障。

由于它们主要处于空闲状态,传感器有两种不同的运行模式,以便更高效地使用它们

  • `poke`(默认):传感器在整个运行期间占用一个工作槽位

  • `reschedule`:传感器仅在检查时占用工作槽位,检查之间会休眠设定的时间间隔

可以在实例化传感器时直接配置 `poke` 和 `reschedule` 模式;通常,它们之间的权衡是延迟。每秒检查一次的应使用 `poke` 模式,而每分钟检查一次的应使用 `reschedule` 模式。

与算子类似,Airflow 提供了大量预构建的传感器,可在核心 Airflow 中使用,也可以通过我们的*提供者*系统使用。

BaseSensorOperator 参数

Airflow 中的所有传感器最终都继承自 BaseSensorOperator(直接或间接)。该基类定义了控制传感器等待、重试以及管理工作资源的通用行为和参数。

自 Task SDK 重构后,BaseSensorOperator 在 Task SDK 中实现。由于提供者文档是单独生成的,这些参数可能并非在各个提供者传感器的 API 页面上直接可见。不过,它们适用于*所有*传感器。

通用参数

以下参数由 BaseSensorOperator 提供,适用于所有传感器

poke_interval

连续检查之间的时间间隔(秒)。在 poke 模式下,传感器在检查之间休眠并占用工作槽位。 在 reschedule 模式下,任务会被延迟并在该间隔后重新调度。

timeout

传感器允许运行的最长时间(秒),超过此时间则失败。此超时从第一次执行尝试开始计时,而不是每次检查计时。

mode

决定传感器如何占用工作资源。

  • `poke`(默认):在整个期间占用工作槽位

  • `reschedule`:在检查之间释放工作槽位

soft_fail

如果设置为 True,当超时发生时,传感器将被标记为 SKIPPED 而非 FAILED

exponential_backoff

如果启用,检查之间的时间间隔会指数级增长,直至 max_wait。在轮询不可预测可用性的外部系统时,这非常有用。

max_wait

当启用 exponential_backoff 时,检查之间延迟的上限(秒)。

权威 API 参考请参阅 Task SDK 对 BaseSensorOperator 的文档

https://airflow.org.cn/docs/task-sdk/stable/api.html#airflow.sdk.BaseSensorOperator

示例

BashSensor(
    task_id="wait_for_file",
    bash_command="test -f /data/input.csv",
    poke_interval=60,
    timeout=60 * 60,
    mode="reschedule",
)

提示

如果专门等待文件,可以考虑使用 FileSensor。它专为检测本地文件系统中的文件而构建,并支持可延迟模式,以提升资源效率。详情请参阅 FileSensor

通用传感器

Airflow 开箱即用提供了大量传感器。以下是标准提供者中一些常用的传感器

要查看可用传感器的完整列表,请参阅 算子和 Hook 参考

此条目是否有帮助?