传感器
传感器是一种特殊类型的 Operator,旨在只做一件事——等待某件事发生。它可以基于时间,或等待文件,或外部事件,但它们的唯一工作就是等待直到某事发生,然后 成功,使其下游任务能够运行。或者,如果该事件在配置的超时时间内未发生,失败,从而通过常规机制提醒你出现故障。
由于它们主要处于空闲状态,传感器有两种不同的运行模式,以便更高效地使用它们
`poke`(默认):传感器在整个运行期间占用一个工作槽位
`reschedule`:传感器仅在检查时占用工作槽位,检查之间会休眠设定的时间间隔
可以在实例化传感器时直接配置 `poke` 和 `reschedule` 模式;通常,它们之间的权衡是延迟。每秒检查一次的应使用 `poke` 模式,而每分钟检查一次的应使用 `reschedule` 模式。
与算子类似,Airflow 提供了大量预构建的传感器,可在核心 Airflow 中使用,也可以通过我们的*提供者*系统使用。
另请参阅
BaseSensorOperator 参数
Airflow 中的所有传感器最终都继承自 BaseSensorOperator(直接或间接)。该基类定义了控制传感器等待、重试以及管理工作资源的通用行为和参数。
自 Task SDK 重构后,BaseSensorOperator 在 Task SDK 中实现。由于提供者文档是单独生成的,这些参数可能并非在各个提供者传感器的 API 页面上直接可见。不过,它们适用于*所有*传感器。
通用参数
以下参数由 BaseSensorOperator 提供,适用于所有传感器
poke_interval连续检查之间的时间间隔(秒)。在
poke模式下,传感器在检查之间休眠并占用工作槽位。 在reschedule模式下,任务会被延迟并在该间隔后重新调度。timeout传感器允许运行的最长时间(秒),超过此时间则失败。此超时从第一次执行尝试开始计时,而不是每次检查计时。
mode决定传感器如何占用工作资源。
`poke`(默认):在整个期间占用工作槽位
`reschedule`:在检查之间释放工作槽位
soft_fail如果设置为
True,当超时发生时,传感器将被标记为SKIPPED而非FAILED。exponential_backoff如果启用,检查之间的时间间隔会指数级增长,直至
max_wait。在轮询不可预测可用性的外部系统时,这非常有用。max_wait当启用
exponential_backoff时,检查之间延迟的上限(秒)。
权威 API 参考请参阅 Task SDK 对 BaseSensorOperator 的文档
https://airflow.org.cn/docs/task-sdk/stable/api.html#airflow.sdk.BaseSensorOperator
示例
BashSensor(
task_id="wait_for_file",
bash_command="test -f /data/input.csv",
poke_interval=60,
timeout=60 * 60,
mode="reschedule",
)
提示
如果专门等待文件,可以考虑使用 FileSensor。它专为检测本地文件系统中的文件而构建,并支持可延迟模式,以提升资源效率。详情请参阅 FileSensor。
通用传感器
Airflow 开箱即用提供了大量传感器。以下是标准提供者中一些常用的传感器
FileSensor- 等待文件出现在文件系统中BashSensor- 等待 Bash 命令返回 truePythonSensor- 等待 Python 可调用对象返回 trueTimeSensor- 等待至指定的时间点TimeDeltaSensor- 等待指定的时间跨度ExternalTaskSensor- 等待另一个 DAG 中的任务完成
要查看可用传感器的完整列表,请参阅 算子和 Hook 参考。