调度器¶
Airflow 调度器监控所有任务和 DAG,并在其依赖项完成后触发任务实例。在幕后,调度器会启动一个子进程,该进程监控指定 DAG 目录中的所有 DAG 并与之保持同步。默认情况下,调度器每分钟收集一次 DAG 解析结果,并检查是否有任何活动任务可以触发。
Airflow 调度器设计为在 Airflow 生产环境中作为持久服务运行。要启动它,您只需要执行 airflow scheduler
命令。它使用 airflow.cfg
中指定的配置。
调度器使用配置的 Executor(执行器) 来运行就绪的任务。
要启动调度器,只需运行命令
airflow scheduler
一旦调度器成功运行,您的 DAG 将开始执行。
注意
第一个 DAG Run 是根据您 DAG 中任务的最小 start_date
创建的。随后的 DAG Run 将根据您 DAG 的 timetable(时间表) 创建。
对于具有 cron 或 timedelta 调度的 DAG,调度器只有在其涵盖的时间段结束后才会触发您的任务,例如,设置 schedule
为 @daily
的任务将在一天结束后运行。这种技术确保在该时间段所需的任何数据在 DAG 执行之前完全可用。在 UI 中,看起来 Airflow 似乎将您的任务延迟了一天执行。
注意
如果您将 DAG 的 schedule
设置为一天,那么数据时间间隔从 2019-11-21
开始的运行将在 2019-11-21T23:59
之后触发。
我们再重复一遍,调度器在开始日期之后一个 schedule
周期,在间隔的结束时运行您的任务。
有关调度 DAG 的详细信息,您应参考 DAG Run(DAG 运行)。
注意
调度器旨在实现高吞吐量。这是一个经过深思熟虑的设计决策,旨在尽快调度任务。调度器检查池 (pool) 中有多少可用插槽,并在一次迭代中最多调度相应数量的任务实例。这意味着只有当等待调度的任务多于队列插槽时,任务优先级才会生效。因此,如果低优先级任务与高优先级任务共享同一个批次,可能会出现低优先级任务在更高优先级任务之前被调度的情况。要了解更多信息,您可以参考这个 GitHub 讨论。
运行多个调度器¶
Airflow 支持同时运行多个调度器——既为了性能原因,也为了提高弹性。
概述¶
HA 调度器旨在利用现有的元数据数据库。这主要是为了操作的简单性:每个组件都已经需要与该数据库通信,并且通过不使用调度器之间的直接通信或共识算法(Raft, Paxos 等),也不使用其他共识工具(例如 Apache Zookeeper 或 Consul),我们将“操作面”保持在最低限度。
调度器现在使用序列化的 DAG 表示来进行调度决策,调度循环的大致流程是
检查是否有任何 DAG 需要新的 DagRun 并创建它们
检查一批 DagRun,查找可调度的 TaskInstances 或已完成的 DagRun
选择可调度的 TaskInstances,并在尊重 Pool 限制及其他并发限制的情况下,将其排队等待执行
但是,这确实对数据库提出了一些要求。
数据库要求¶
简而言之,PostgreSQL 12+ 或 MySQL 8.0+ 的用户都已准备就绪——您可以根据需要运行任意数量的调度器副本——无需进一步的设置或配置选项。如果您使用其他数据库,请继续阅读。
为了保持性能和吞吐量,调度循环中有一部分在内存中进行大量计算(因为每次 TaskInstance 都需要往返数据库会太慢),因此我们需要确保在任何时候只有一个调度器处于此关键区域——否则限制将无法得到正确遵守。为实现这一点,我们使用数据库行级锁(使用 SELECT ... FOR UPDATE
)。
这个关键区域是 TaskInstances 从调度状态转移到执行器队列的地方,同时确保各种并发和 Pool 限制得到遵守。通过请求对 Pool 表的每一行进行行级写锁来获得关键区域(大致相当于 SELECT * FROM slot_pool FOR UPDATE NOWAIT
,但确切的查询略有不同)。
以下数据库完全支持并提供“最优”体验
PostgreSQL 12+
MySQL 8.0+
警告
MariaDB 直到 10.6.0 版本才实现了 SKIP LOCKED
或 NOWAIT
SQL 子句。如果没有这些功能,则不支持运行多个调度器,并且已报告了死锁错误。MariaDB 10.6.0 及后续版本可能与多个调度器正常配合工作,但这尚未经过测试。
注意
Microsoft SQL Server 尚未与 HA 一起进行过测试。
微调调度器性能¶
影响调度器性能的因素¶
调度器负责持续调度任务以供执行。为了微调您的调度器,您需要考虑多个因素
- 您的部署类型
您有多少可用内存
您有多少可用 CPU
您有多少可用网络吞吐量
- 您的 DAG 结构的逻辑和定义
您有多少个 DAG
它们的复杂程度(即它们有多少任务和依赖项)
- 调度器配置
您有多少个调度器
调度器在一个循环中处理多少任务实例
每个循环应该创建/调度多少新的 DAG 运行
调度器应该多久执行一次清理并检查孤立任务/认领它们
为了进行微调,最好了解调度器在底层是如何工作的。您可以观看 Airflow Summit 2021 的“深入了解 Airflow 调度器”演讲来进行微调。
如何进行调度器微调¶
Airflow 提供了许多“旋钮”供您调整以微调性能,但根据您的具体部署、DAG 结构、硬件可用性和期望,决定调整哪些“旋钮”才能获得最佳效果,这是一个单独的任务。管理部署的一部分工作是决定您将优化哪些方面。
Airflow 赋予您决定权,但您应该找出对您最重要的性能方面,并决定您要朝哪个方向调整哪些“旋钮”。
一般来说,进行微调的方法应与任何性能改进和优化一样(我们不会推荐任何特定工具——只需使用您通常用于观察和监控系统的工具)
使用您通常用于监控系统的工具集来监控您的系统至关重要。本文档不详细介绍您可以使用的具体指标和工具,它只描述了您应该监控哪些类型的资源,但您应该遵循您的最佳监控实践来获取正确的数据。
决定对您最重要的性能方面(您想改进什么)
观察您的系统,查看瓶颈所在:CPU、内存、I/O 是常见的限制因素
根据您的期望和观察结果,决定您的下一个改进是什么,并回到对您的性能、瓶颈的观察。性能改进是一个迭代过程。
可能限制调度器性能的资源¶
有几个资源使用方面您应该注意
随着您希望提高性能和并行处理更多事物,数据库连接和数据库使用可能会成为问题。Airflow 以“数据库连接饥渴”而闻名——您拥有的 DAG 越多,希望并行处理的事物越多,打开的数据库连接就越多。这对于 MySQL 通常不是问题,因为它的连接处理模型是基于线程的,但这对于 Postgres 可能是一个问题,因为它的连接处理是基于进程的。一般认为,即使是中等规模的基于 Postgres 的 Airflow 安装,最好的解决方案是使用 PGBouncer 作为您数据库的代理。Apache Airflow 的 Helm Chart 开箱即用地支持 PGBouncer。
Airflow 调度器在多个实例下几乎线性扩展,因此如果您的调度器性能受限于 CPU,您也可以添加更多调度器。
在查看内存使用情况时,请务必注意您正在观察的是哪种类型的内存。通常您应该查看
working memory
(工作内存,名称可能因您的部署而异),而不是total memory used
(总已用内存)。
您可以做些什么来提高调度器性能¶
当您了解自己的资源使用情况时,您可以考虑的改进措施可能包括
提高资源利用率。这指的是您的系统中存在看似未充分利用的闲置容量(同样,CPU、内存、I/O、网络是主要候选项)——您可以采取增加调度器数量或缩短间隔以更频繁地执行操作等措施,这可能会提高性能,但代价是更高的资源利用率。
增加硬件容量(例如,如果您发现 CPU 是您的瓶颈)。调度器性能问题通常只是因为您的系统不够“强大”,这可能是唯一的解决方案。例如,如果您发现机器上的所有 CPU 都已被占用,您可能需要在新机器上添加另一个调度器——在大多数情况下,当您添加第 2 个或第 3 个调度器时,调度能力会线性增长(除非共享数据库或类似资源成为瓶颈)。
尝试不同的“调度器可调参数”值。通常,您可以通过简单地权衡一个性能方面来换取另一个方面,从而获得更好的效果。性能调优通常是平衡不同方面的艺术。
调度器配置选项¶
以下配置设置可用于控制调度器的各个方面。但是,您也可以查看 配置参考 中 [scheduler]
部分提供的其他与性能无关的调度器配置参数。
max_dagruns_to_create_per_loop
这改变了每个调度器在创建 DAG 运行时锁定的 DAG 数量。设置此参数较低的一个可能原因是,如果您有大型 DAG(每个 DAG 包含 1 万个以上的任务)并运行多个调度器,您不会希望一个调度器完成所有工作。
max_dagruns_per_loop_to_schedule
调度器在调度和排队任务时应该检查(并锁定)多少个 DagRun。增加此限制将允许小型 DAG 获得更高的吞吐量,但可能会降低大型 DAG(例如,超过 500 个任务)的吞吐量。在使用多个调度器时将此值设置得过高,也可能导致一个调度器接管所有 DAG 运行,而没有工作留给其他调度器。
-
调度器是否应在相关查询中发出
SELECT ... FOR UPDATE
。如果此设置为 False,则您不应同时运行多个调度器。 -
Pool 使用统计信息应该多久(以秒为单位)发送到 StatsD(如果启用了
statsd_on
)?计算此信息是一个相对耗时的查询,因此应将其设置为与您的 StatsD 汇总周期相同的周期。 -
运行中的任务实例统计信息应该多久(以秒为单位)发送到 StatsD(如果启用了
statsd_on
)?计算此信息是一个相对耗时的查询,因此应将其设置为与您的 StatsD 汇总周期相同的周期。 -
调度器应该多久(以秒为单位)检查一次孤立任务或死亡的 SchedulerJobs?
此设置控制如何检测到死亡的调度器,以及如何由另一个调度器接管其“监督”的任务。任务将继续运行,因此暂时不检测到死亡调度器也没有关系。
当一个 SchedulerJob 被检测为“死亡”(由 scheduler_health_check_threshold 决定)时,由该死亡进程启动的任何正在运行或排队的任务将被当前调度器“认领”并转为监控。
max_tis_per_query 调度主循环中查询的批处理大小。此值不应大于
core.parallelism
。如果此值过高,则 SQL 查询性能可能会受到查询谓词复杂性和/或过度锁定的影响。此外,您可能会达到数据库允许的最大查询长度。将此值设置为 0 以使用
core.parallelism
的值。scheduler_idle_sleep_time 控制调度器在循环之间休眠多久,但仅当该循环中没有任何任务可做时。也就是说,如果它调度了任务,则会立即开始下一个循环迭代。此参数命名不佳(出于历史原因),未来将弃用当前名称并进行重命名。