Apache Airflow 是一个用于以编程方式创作、调度和监控工作流的平台。工作流是处理一组数据的任务序列。您可以将工作流视为描述任务如何从“未完成”状态转变为“完成”状态的路径。另一方面,调度是指规划、控制和优化特定任务何时完成的过程。
在 Apache Airflow 中创作工作流。
Airflow 使用 Python 脚本可以轻松地创作工作流。有向无环图(DAG)表示 Airflow 中的工作流。它是一组任务的集合,展示了每个任务的关系和依赖项。您可以拥有任意数量的 DAG,Airflow 将根据任务的关系和依赖项执行它们。如果任务 B 依赖于另一个任务 A 的成功执行,则意味着 Airflow 将运行任务 A,并且只有在任务 A 完成后才会运行任务 B。这种依赖关系在 Airflow 中很容易表达。例如,上述场景可以表示为
task_A >> task_B
也等价于
task_A.set_downstream(task_B)
这可以帮助 Airflow 知道需要在任务 B 之前执行任务 A。任务之间的关系可能比上面表达的要复杂得多,Airflow 会根据它们的关系和依赖项找出如何以及何时执行任务。
在我们讨论 Airflow 的架构(该架构使工作流的调度、执行和监控变得容易)之前,让我们先讨论一下 Breeze 环境。
Breeze 环境
Breeze 环境是 Airflow 的开发环境,您可以在其中运行测试、构建映像、构建文档等等。关于 Breeze 环境有优秀的文档和视频。请查看它们。您可以通过运行 ./breeze
脚本进入 Breeze 环境。您可以在 Breeze 环境中运行此处提到的所有命令。
调度器
调度器是监控 DAG 并触发那些满足依赖项的任务的组件。它监视 DAG 文件夹,检查每个 DAG 中的任务,并在任务准备就绪后触发它们。它通过生成一个定期(大约每分钟一次)运行的进程来实现这一点,该进程读取元数据数据库以检查每个任务的状态并决定需要做什么。元数据数据库是记录所有任务状态的地方。状态可以是运行中、成功、失败等。
当任务的依赖项都满足时,该任务被认为是准备就绪的。依赖项包括执行任务所需的所有数据。应该注意的是,调度器不会在它覆盖的时间段结束之前触发您的任务。如果任务的 schedule_interval
为 @daily
,则调度器会在一天结束时而不是开始时触发该任务。这是为了确保任务所需的数据已准备就绪。也可以在 UI 上手动触发任务。
在 Breeze 环境中,可以通过运行命令 airflow scheduler
启动调度器。它使用配置的生产环境。该配置可以在 airflow.cfg
中指定。
执行器
执行器负责运行任务。它们与调度器协同工作,以获取有关运行任务所需资源的信息,因为任务已排队。
默认情况下,Airflow 使用 SequentialExecutor。但是,此执行器是有限的,并且是唯一可以与 SQLite 一起使用的执行器。
还有许多其他执行器,它们之间的区别在于它们拥有的资源以及它们如何选择使用资源。可用的执行器有
- 顺序执行器
- 调试执行器
- 本地执行器
- Dask 执行器
- Celery 执行器
- Kubernetes 执行器
- 使用 Mesos 横向扩展(社区贡献)
与 SequentialExecutor 相比,CeleryExecutor 是一个更好的执行器。CeleryExecutor 使用多个工作进程以分布式方式执行作业。如果工作进程节点出现故障,CeleryExecutor 会将其任务分配给另一个工作进程节点。这可确保高可用性。
CeleryExecutor 与调度器密切合作,调度器将消息添加到队列中,Celery 代理将消息传递给 Celery 工作进程以执行。您可以在 文档中找到有关 CeleryExecutor 以及如何配置它的更多信息。
Web 服务器
Web 服务器是 Airflow 的 Web 界面(UI)。UI 功能丰富。它可以轻松地监控和排查 DAG 和任务的故障。
您可以在 UI 上执行许多操作。您可以触发任务,监控执行情况,包括任务的持续时间。UI 可以以树视图和图形视图查看任务的依赖关系。您可以在 UI 中查看任务日志。
在 breeze 环境中,使用命令 airflow webserver
启动 Web UI。
后端
默认情况下,Airflow 使用 SQLite 后端来存储配置信息、DAG 状态和许多其他有用的信息。不应在生产中使用此选项,因为 SQLite 可能会导致数据丢失。
您可以使用 PostgreSQL 或 MySQL 作为 Airflow 的后端。很容易更改为 PostgreSQL 或 MySQL。
当启动 breeze 环境时,命令 ./breeze --backend mysql
选择 MySQL 作为后端。
运算符
运算符决定了任务的执行内容。Airflow 有很多内置的运算符。每个运算符执行特定的任务。有一个执行 bash 命令的 BashOperator,调用 python 函数的 PythonOperator,在 AWS Batch 上执行作业的 AwsBatchOperator 以及更多。
传感器
传感器可以描述为用于监控长时间运行的任务的特殊运算符。与运算符一样,Airflow 中有许多预定义的传感器。其中包括
- AthenaSensor:请求查询的状态,直到它达到失败状态或成功状态。
- AzureCosmosDocumentSensor:检查 CosmosDB 中是否存在与给定查询匹配的文档
- GoogleCloudStorageObjectSensor:检查 Google Cloud Storage 中是否存在文件
可以在此模块中找到大多数可用传感器的列表
为 Airflow 做贡献
Airflow 是一个开源项目,欢迎所有人做出贡献。由于有关于如何入门的优秀文档,因此入门很容易。
大约 12 周前,我通过 Outreachy 计划加入了社区,并且已经完成了大约 40 个 PR。
这是一次令人惊奇的体验!感谢我的导师 Jarek 和 Kaxil,以及社区成员,尤其是 Kamil 和 Tomek 的所有支持。我非常感激!
非常感谢 Leah E. Cole 的精彩评论。
分享