Apache Airflow 是一个以编程方式作者、调度和监控工作流的平台。工作流是处理一组数据的任务序列。你可以将工作流视为描述任务从未完成到完成的路径。另一方面,调度是计划、控制和优化何时执行特定任务的过程。

在 Apache Airflow 中编写工作流。

Airflow 使使用 Python 脚本轻松编写工作流。有向无环图(Directed Acyclic Graph,简称 DAG)在 Airflow 中表示工作流。它是一系列任务的集合,以一种方式展示了每个任务的关系和依赖。你可以拥有任意数量的 DAG,Airflow 将根据任务的关系和依赖来执行它们。如果任务 B 依赖于任务 A 的成功执行,这意味着 Airflow 将运行任务 A,并且只在任务 A 之后运行任务 B。这种依赖关系在 Airflow 中非常容易表达。例如,上述场景表达为

task_A >> task_B

也等同于

task_A.set_downstream(task_B)

Simple Dag

这有助于 Airflow 知道它需要在任务 B 之前执行任务 A。任务之间的关系可以比上面表达的复杂得多,Airflow 会根据它们的关系和依赖来确定如何以及何时执行任务。 复杂 DAG

在我们讨论 Airflow 使工作流的调度、执行和监控变得容易的架构之前,让我们先讨论一下 Breeze 环境

Breeze 环境

Breeze 环境是 Airflow 的开发环境,您可以在其中运行测试、构建镜像、构建文档以及许多其他事情。有关于 Breeze 环境的出色文档和视频。请查阅。您可以通过运行 ./breeze 脚本进入 Breeze 环境。您可以在 Breeze 环境中运行此处提到的所有命令。

调度器

调度器是监控 DAG 并触发那些依赖关系已满足的任务的组件。它监视 DAG 文件夹,检查每个 DAG 中的任务,并在它们准备就绪时触发它们。它通过生成一个定期运行(大约每分钟一次)的进程来实现这一点,该进程读取元数据数据库以检查每个任务的状态并决定需要做什么。元数据数据库记录了所有任务的状态。状态可以是运行中、成功、失败等。

当任务的依赖关系得到满足时,该任务就被认为是准备就绪了。依赖关系包括执行任务所需的所有数据。需要注意的是,调度器不会触发您的任务,直到它涵盖的周期结束。如果任务的 schedule_interval@daily,调度器会在一天结束时触发任务,而不是在开始时。这是为了确保任务所需的数据已经准备好。也可以在 UI 上手动触发任务。

Breeze 环境中,通过运行命令 airflow scheduler 启动调度器。它使用配置的生产环境。配置可以在 airflow.cfg 中指定。

执行器

执行器负责运行任务。它们与调度器协作,在任务排队时获取运行任务所需的资源信息。

默认情况下,Airflow 使用序列化执行器(SequentialExecutor)。然而,这个执行器功能有限,并且是唯一可以与 SQLite 一起使用的执行器。

还有许多其他执行器,区别在于它们拥有的资源以及如何选择使用这些资源。可用的执行器包括

  • 序列化执行器(Sequential Executor)
  • 调试执行器(Debug Executor)
  • 本地执行器(Local Executor)
  • Dask 执行器(Dask Executor)
  • Celery 执行器(Celery Executor)
  • Kubernetes 执行器(Kubernetes Executor)
  • 使用 Mesos 进行扩展(社区贡献)

与 SequentialExecutor 相比,CeleryExecutor 是一个更好的执行器。CeleryExecutor 使用多个工作节点以分布式方式执行任务。如果一个工作节点宕机,CeleryExecutor 会将其任务分配给另一个工作节点。这确保了高可用性。

CeleryExecutor 与调度器密切协作,调度器将消息添加到队列中,然后由 Celery broker 将消息传递给 Celery worker 进行执行。您可以在文档中找到有关 CeleryExecutor 以及如何配置它的更多信息。

Web 服务器

Web 服务器是 Airflow 的 Web 界面(UI)。该 UI 功能丰富。它使监控和排除 DAG 和任务故障变得容易。

airflow UI

您可以在 UI 上执行许多操作。您可以触发任务、监控执行过程(包括任务的持续时间)。UI 可以树状视图和图状视图查看任务的依赖关系。您可以在 UI 中查看任务日志。

在 breeze 环境中,通过命令 airflow webserver 启动 Web UI。

后端

默认情况下,Airflow 使用 SQLite 后端存储配置信息、DAG 状态以及许多其他有用信息。这不应在生产环境中使用,因为 SQLite 可能导致数据丢失。

您可以使用 PostgreSQL 或 MySQL 作为 Airflow 的后端。切换到 PostgreSQL 或 MySQL 很简单。

在启动 breeze 环境时,命令 ./breeze --backend mysql 会选择 MySQL 作为后端。

Operator

Operator 决定任务做什么。Airflow 有许多内置的 Operator。每个 Operator 执行特定的任务。有执行 bash 命令的 BashOperator,调用 Python 函数的 PythonOperator,在 AWS Batch 上执行作业的 AwsBatchOperator,以及更多

Sensor

Sensor 可以被描述为用于监控长时间运行任务的特殊 Operator。就像 Operator 一样,Airflow 中有许多预定义的 Sensor。这些包括

  • AthenaSensor:询问查询的状态,直到其达到失败或成功状态。
  • AzureCosmosDocumentSensor:检查 CosmosDB 中是否存在与给定查询匹配的文档。
  • GoogleCloudStorageObjectSensor:检查 Google Cloud Storage 中是否存在文件。

大多数可用 Sensor 的列表可以在这个模块中找到。

贡献 Airflow

Airflow 是一个开源项目,欢迎所有人贡献。感谢出色的入门文档,入门非常容易。

我大约 12 周前通过 Outreachy 项目加入社区,并完成了大约 40 个拉取请求(PR)

这是一段非凡的经历!感谢我的导师 JarekKaxil,以及社区成员,特别是 KamilTomek 的所有支持。我非常感激!

非常感谢 Leah E. Cole 精彩的评论。

分享

另请阅读

Airflow Summit 2022

Jarek Potiuk

Airflow Summit 2022 来了

Airflow Summit 2021

Tomasz Urbaszek

我们对 Airflow Summit 2021 感到兴奋!