Apache Airflow 是一个用于以编程方式编写、调度和监控工作流的平台。工作流是一系列处理一组数据的任务。你可以把工作流想象成描述任务从未完成到已完成的路径。另一方面,调度是规划、控制和优化特定任务何时执行的过程。
在 Apache Airflow 中编写工作流。
Airflow 通过 Python 脚本轻松编写工作流。一个 有向无环图(DAG)在 Airflow 中表示工作流。它是任务的集合,以展示每个任务的关系和依赖。你可以拥有任意数量的 DAG,Airflow 将根据任务的关系和依赖执行它们。如果任务 B 依赖于任务 A 的成功执行,这意味着 Airflow 会先运行任务 A,只有在任务 A 完成后才运行任务 B。这个依赖关系在 Airflow 中非常容易表达。例如,上述场景可以表示为
task_A >> task_B
同样等价于
task_A.set_downstream(task_B)

这让 Airflow 知道需要先执行任务 A 再执行任务 B。任务之间的关系可以远比上述示例更为复杂,Airflow 会根据它们的关系和依赖来决定何时以及如何执行这些任务。
在讨论让工作流的调度、执行和监控变得简单的 Airflow 架构之前,让我们先了解一下 Breeze 环境。
Breeze 环境
Breeze 环境是 Airflow 的开发环境,你可以在其中运行测试、构建镜像、编写文档以及进行许多其他操作。关于 Breeze 环境有非常优秀的 文档和视频,请务必查看。通过运行 ./breeze 脚本即可进入 Breeze 环境。所有在本文中提到的命令都可以在 Breeze 环境中执行。
调度器
调度器是负责监控 DAG 并触发满足依赖条件的任务的组件。它会监视 DAG 文件夹,检查每个 DAG 中的任务,并在任务准备就绪时触发它们。调度器通过定期(大约每分钟)启动一个进程读取元数据数据库,以检查每个任务的状态并决定应执行的操作。元数据数据库记录了所有任务的状态,状态可以是运行中、成功、失败等。
当任务的依赖条件满足时,任务即被视为“就绪”。这些依赖包括任务执行所需的所有数据。需要注意的是,调度器不会在覆盖的时间段结束前触发任务。如果任务的 schedule_interval 为 @daily,调度器会在当天结束时触发任务,而不是在一天开始时触发。这是为了确保任务所需的数据已经准备好。也可以在 UI 上手动触发任务。
在 Breeze 环境 中,调度器通过运行 airflow scheduler 命令启动。它使用已配置的生产环境。相关配置可在 airflow.cfg 中指定。
执行器
Executor 负责实际运行任务。它们与调度器协作,获取任务排队时所需的资源信息。
默认情况下,Airflow 使用 SequentialExecutor。然而,该执行器功能受限,并且是唯一可以与 SQLite 配合使用的执行器。
除此之外还有许多 Executor,它们的区别在于拥有的资源以及资源使用方式。可用的 Executor 包括:
- Sequential Executor
- Debug Executor
- Local Executor
- Dask Executor
- Celery Executor
- Kubernetes Executor
- 使用 Mesos 扩展(社区贡献)
相较于 Sequential Executor,CeleryExecutor 是更好的执行器。CeleryExecutor 使用多个 worker 以分布式方式执行作业。如果某个 worker 节点宕机,CeleryExecutor 会将其任务分配给其他可用的 worker 节点,从而实现高可用性。
CeleryExecutor 与调度器紧密配合:调度器向队列中添加消息,Celery broker 再将消息投递给 Celery worker 执行。有关 CeleryExecutor 的更多信息以及如何配置,请参阅 官方文档。
Web 服务器
Web 服务器提供 Airflow 的网页界面(UI)。该 UI 功能丰富,能够轻松监控和排查 DAG 与任务。

在 UI 上可以执行多种操作。你可以手动触发任务、监控任务执行及其耗时。UI 还能以树形视图和图形视图展示任务之间的依赖关系,并查看任务日志。
在 Breeze 环境中,使用 airflow webserver 命令启动 Web UI。
后端
默认情况下,Airflow 使用 SQLite 作为后端来存储配置信息、DAG 状态以及其他大量有用的数据。生产环境不应使用 SQLite,因为它可能导致数据丢失。
你可以把 PostgreSQL 或 MySQL 作为 Airflow 的后端。切换到 PostgreSQL 或 MySQL 非常容易。
通过运行 ./breeze --backend mysql 可以在启动 Breeze 环境时选择 MySQL 作为后端。
算子 (Operators)
Operator 决定任务要执行的具体操作。Airflow 自带大量内置 Operator,每个 Operator 完成特定任务。例如 BashOperator 用于执行 bash 命令,PythonOperator 调用 Python 函数,AwsBatchOperator 在 AWS Batch 上运行作业,更多请见 官方文档。
传感器 (Sensors)
Sensor 可以视为一种特殊的 Operator,用于监控长期运行的任务。与普通 Operator 类似,Airflow 也提供了许多预定义的 Sensor,例如:
- AthenaSensor:查询 Athena 作业状态,直至其进入失败或成功状态。
- AzureCosmosDocumentSensor:检查 CosmosDB 中是否存在满足给定查询的文档。
- GoogleCloudStorageObjectSensor:检查 Google Cloud Storage 中是否存在指定文件。
大多数可用的 Sensor 列表可在此 模块 中查阅。
为 Airflow 做贡献
Airflow 是开源项目,欢迎任何人参与贡献。得益于优秀的 入门文档,上手非常容易。
我约 12 周前通过 Outreachy 项目 加入社区,迄今已完成约 40 个 PR。
这段经历非常棒!感谢我的导师 Jarek 与 Kaxil,以及社区成员尤其是 Kamil 和 Tomek 的大力支持。我由衷感激!
非常感谢 Leah E. Cole 的精彩审阅。
分享