事件驱动调度¶
版本 3.0 中新增。
Apache Airflow 支持事件驱动调度,使得 DAG 可以根据外部事件触发,而不是预定义的时间表。这在现代数据架构中特别有用,因为工作流需要对实时数据变化、消息或系统信号做出反应。
通过使用资产(assets),如 Asset 感知调度 中所述,你可以配置 DAG 在特定外部事件发生时开始执行。资产提供了一种机制来建立外部事件和 DAG 执行的依赖关系,确保工作流能够动态地响应外部环境的变化。
AssetWatcher
类在此机制中起着关键作用。它监控外部事件源(例如消息队列),并在相关事件发生时触发资产更新。在 Asset
定义中的 watchers
参数允许你将多个 AssetWatcher
实例与一个资产关联,使其能够响应各种事件源。
请参阅 common.messaging provider 文档 了解更多信息和示例。
事件驱动调度支持的 trigger¶
并非 Airflow 中的所有 trigger 都可用于事件驱动调度。与继承自 BaseTrigger
的所有 trigger 不同,只有继承自 BaseEventTrigger
的一部分 trigger 是兼容的。此限制的原因是某些 trigger 并非为事件驱动调度而设计,使用它们来调度 DAG 可能会导致意外结果。
BaseEventTrigger
确保用于调度的 trigger 遵循事件驱动范式,适当地响应外部事件变化而不会导致意外的 DAG 行为。
编写与事件驱动兼容的 trigger¶
要使 trigger 与事件驱动调度兼容,它必须继承自 BaseEventTrigger
。在这种情况下,使用 trigger 主要有三种场景:
1. 创建新的事件驱动 trigger:如果你需要针对不受支持的事件源创建新的 trigger,则应创建继承自 BaseEventTrigger
的新类并实现其逻辑。
2. 调整现有的兼容 trigger:如果现有的 trigger(继承自 BaseTrigger
)已被证明与事件驱动调度兼容,那么你只需将其基类从 BaseTrigger
更改为 BaseEventTrigger
。
3. 调整现有的不兼容 trigger:如果现有的 trigger 似乎与事件驱动调度不兼容,那么必须创建一个新的 trigger。这个新的 trigger 必须继承 BaseEventTrigger
并确保它能正确地与事件驱动调度配合使用。如果两个 trigger 共享一些公共代码,它也可以继承现有的 trigger。
避免无限调度¶
某些 trigger 与事件驱动调度不兼容的原因是它们正在等待外部资源达到给定状态。例如:
等待存储服务中文件存在
等待作业处于成功状态
等待数据库中存在行
在这种条件下进行调度可能导致无限次调度。这是因为一旦条件变为真,它很可能在很长时间内保持真。
例如,考虑一个 DAG,它被调度在特定作业达到“成功”状态时运行。一旦作业成功,它通常会保持该状态。因此,每当 triggerer 检查条件时,该 DAG 都会被重复触发。
另一个例子是 S3KeyTrigger
,它检查 S3 存储桶中是否存在特定文件。文件创建后,由于条件“文件 X 是否存在于存储桶 Y 中”保持为真,trigger 在每次检查时都会继续成功。这导致每次 trigger 机制运行时,DAG 都会被无限期地触发。
创建自定义 trigger 时,请谨慎使用一旦满足就永久保持为真的条件。这可能会无意中导致无限次的 DAG 执行并使你的系统过载。
事件驱动 DAG 的使用案例¶
数据摄入管道:当新数据到达存储系统时触发 ETL 工作流。
机器学习工作流:在新数据集可用时启动模型训练。
物联网 (IoT) 和实时分析:实时响应传感器数据、日志或应用程序事件。
微服务和事件驱动架构:基于服务间消息编排工作流。