Apache Airflow 是一个用于以编程方式编写、调度和监控工作流的平台。工作流是一系列处理一组数据的任务。你可以把工作流想象成描述任务从未完成到已完成的路径。另一方面,调度是规划、控制和优化特定任务何时执行的过程。

在 Apache Airflow 中编写工作流。

Airflow 通过 Python 脚本轻松编写工作流。一个 有向无环图(DAG)在 Airflow 中表示工作流。它是任务的集合,以展示每个任务的关系和依赖。你可以拥有任意数量的 DAG,Airflow 将根据任务的关系和依赖执行它们。如果任务 B 依赖于任务 A 的成功执行,这意味着 Airflow 会先运行任务 A,只有在任务 A 完成后才运行任务 B。这个依赖关系在 Airflow 中非常容易表达。例如,上述场景可以表示为

task_A >> task_B

同样等价于

task_A.set_downstream(task_B)

Simple Dag

这让 Airflow 知道需要先执行任务 A 再执行任务 B。任务之间的关系可以远比上述示例更为复杂,Airflow 会根据它们的关系和依赖来决定何时以及如何执行这些任务。复杂 DAG

在讨论让工作流的调度、执行和监控变得简单的 Airflow 架构之前,让我们先了解一下 Breeze 环境

Breeze 环境

Breeze 环境是 Airflow 的开发环境,你可以在其中运行测试、构建镜像、编写文档以及进行许多其他操作。关于 Breeze 环境有非常优秀的 文档和视频,请务必查看。通过运行 ./breeze 脚本即可进入 Breeze 环境。所有在本文中提到的命令都可以在 Breeze 环境中执行。

调度器

调度器是负责监控 DAG 并触发满足依赖条件的任务的组件。它会监视 DAG 文件夹,检查每个 DAG 中的任务,并在任务准备就绪时触发它们。调度器通过定期(大约每分钟)启动一个进程读取元数据数据库,以检查每个任务的状态并决定应执行的操作。元数据数据库记录了所有任务的状态,状态可以是运行中、成功、失败等。

当任务的依赖条件满足时,任务即被视为“就绪”。这些依赖包括任务执行所需的所有数据。需要注意的是,调度器不会在覆盖的时间段结束前触发任务。如果任务的 schedule_interval@daily,调度器会在当天结束时触发任务,而不是在一天开始时触发。这是为了确保任务所需的数据已经准备好。也可以在 UI 上手动触发任务。

Breeze 环境 中,调度器通过运行 airflow scheduler 命令启动。它使用已配置的生产环境。相关配置可在 airflow.cfg 中指定。

执行器

Executor 负责实际运行任务。它们与调度器协作,获取任务排队时所需的资源信息。

默认情况下,Airflow 使用 SequentialExecutor。然而,该执行器功能受限,并且是唯一可以与 SQLite 配合使用的执行器。

除此之外还有许多 Executor,它们的区别在于拥有的资源以及资源使用方式。可用的 Executor 包括:

  • Sequential Executor
  • Debug Executor
  • Local Executor
  • Dask Executor
  • Celery Executor
  • Kubernetes Executor
  • 使用 Mesos 扩展(社区贡献)

相较于 Sequential Executor,CeleryExecutor 是更好的执行器。CeleryExecutor 使用多个 worker 以分布式方式执行作业。如果某个 worker 节点宕机,CeleryExecutor 会将其任务分配给其他可用的 worker 节点,从而实现高可用性。

CeleryExecutor 与调度器紧密配合:调度器向队列中添加消息,Celery broker 再将消息投递给 Celery worker 执行。有关 CeleryExecutor 的更多信息以及如何配置,请参阅 官方文档

Web 服务器

Web 服务器提供 Airflow 的网页界面(UI)。该 UI 功能丰富,能够轻松监控和排查 DAG 与任务。

airflow UI

在 UI 上可以执行多种操作。你可以手动触发任务、监控任务执行及其耗时。UI 还能以树形视图和图形视图展示任务之间的依赖关系,并查看任务日志。

在 Breeze 环境中,使用 airflow webserver 命令启动 Web UI。

后端

默认情况下,Airflow 使用 SQLite 作为后端来存储配置信息、DAG 状态以及其他大量有用的数据。生产环境不应使用 SQLite,因为它可能导致数据丢失。

你可以把 PostgreSQL 或 MySQL 作为 Airflow 的后端。切换到 PostgreSQL 或 MySQL 非常容易。

通过运行 ./breeze --backend mysql 可以在启动 Breeze 环境时选择 MySQL 作为后端。

算子 (Operators)

Operator 决定任务要执行的具体操作。Airflow 自带大量内置 Operator,每个 Operator 完成特定任务。例如 BashOperator 用于执行 bash 命令,PythonOperator 调用 Python 函数,AwsBatchOperator 在 AWS Batch 上运行作业,更多请见 官方文档

传感器 (Sensors)

Sensor 可以视为一种特殊的 Operator,用于监控长期运行的任务。与普通 Operator 类似,Airflow 也提供了许多预定义的 Sensor,例如:

  • AthenaSensor:查询 Athena 作业状态,直至其进入失败或成功状态。
  • AzureCosmosDocumentSensor:检查 CosmosDB 中是否存在满足给定查询的文档。
  • GoogleCloudStorageObjectSensor:检查 Google Cloud Storage 中是否存在指定文件。

大多数可用的 Sensor 列表可在此 模块 中查阅。

为 Airflow 做贡献

Airflow 是开源项目,欢迎任何人参与贡献。得益于优秀的 入门文档,上手非常容易。

我约 12 周前通过 Outreachy 项目 加入社区,迄今已完成约 40 个 PR

这段经历非常棒!感谢我的导师 JarekKaxil,以及社区成员尤其是 KamilTomek 的大力支持。我由衷感激!

非常感谢 Leah E. Cole 的精彩审阅。

分享

阅读更多

介绍 Apache Airflow 注册表

Kaxil Naik

Apache Airflow 注册表是一个可搜索的目录,包含 98 个提供者和 1600 多个模块——运算符、钩子、传感器、触发器等——现已在 airflow.apache.org 在线提供。

Airflow Summit 2022

Jarek Potiuk

Airflow Summit 2022 已经开启