调度器

Airflow 调度器会监控所有任务和 DAG,并在它们的依赖完成后触发任务实例。幕后,调度器会启动一个子进程,持续监控并与指定 DAG 目录下的所有 DAG 保持同步。默认情况下,调度器每分钟会收集一次 DAG 解析结果,并检查是否有可触发的活跃任务。

Airflow 调度器旨在作为生产环境中的持久服务运行。启动它只需执行 airflow scheduler 命令。它会使用 airflow.cfg 中的配置。

调度器使用已配置的 Executor 来运行已就绪的任务。

要启动调度器,只需运行以下命令

airflow scheduler

当调度器成功运行后,你的 DAG 将开始执行。

注意

第一个 Dag Run 会依据 DAG 中任务的最早 start_date 创建。后续的 Dag Run 则根据你的 DAG 的 时间表 创建。

对于使用 cron 或 timedelta 作为调度方式的 DAG,调度器不会在周期结束前触发任务,例如将 schedule 设置为 @daily 的作业会在当天结束后才运行。此做法确保该时间段所需的数据在 DAG 执行前已全部可用。在 UI 中看起来像是 Airflow 延迟了一天 **执行** 你的任务。

注意

如果你将 DAG 的 schedule 设为一天一次,则在 2019-11-21 开始的数据区间会在 2019-11-21T23:59 之后触发。

再说一遍,调度器会在开始日期的 **一个调度周期之后**、在该区间**结束时**运行你的作业。

有关 DAG 调度的详细信息,请参阅 Dag Runs

注意

调度器旨在实现高吞吐量。这一设计决定是为了尽可能快速调度任务。调度器会检查池中可用的空槽数,并在一次迭代中最多调度与空槽数量相同的任务实例。这意味着只有当待调度的任务数超过队列槽数时,任务优先级才会生效。因此,在同一批次中,低优先级任务有时会在高优先级任务之前被调度。有关更多细节,请参考 此 GitHub 讨论

运行多个调度器

Airflow 支持同时运行多个调度器——既可提升性能,也可增强容错性。

概览

HA(高可用)调度器利用现有的元数据数据库实现高可用。这主要是为了运维简化:所有组件都已经需要与该数据库通信;且我们没有在调度器之间使用直接通信或共识算法(Raft、Paxos 等),也没有使用额外的共识工具(如 Apache Zookeeper、Consul),从而将“运维范围”降至最低。

调度器现在使用序列化的 DAG 表示来做调度决策,其调度循环的大致流程如下:

  • 检查是否有需要新建 DagRun 的 DAG,并创建它们

  • 遍历一批 DagRun,查找可调度的 TaskInstance 或已完成的 DagRun

  • 选取可调度的 TaskInstance,同时遵守池限制及其他并发限制,将它们加入执行队列

这对数据库提出了一些要求。

数据库要求

简而言之,使用 PostgreSQL 12+ 或 MySQL 8.0+ 的用户可直接启动任意数量的调度器——无需额外配置。如果你使用其他数据库,请继续阅读。

为保持性能与吞吐量,调度循环中有一段会在内存中完成若干计算(若对每个 TaskInstance 都去数据库回环将极其缓慢),因此必须保证同一时间只有一个调度器在该关键区段执行——否则并发限制将失效。为实现这一点,我们使用行级锁(通过 SELECT ... FOR UPDATE)。

该关键区段负责将 TaskInstance 从“已调度”状态转为写入执行器的队列,同时确保各种并发和池限制得到遵守。实现方式是对 Pool 表的每一行申请写锁(类似 SELECT * FROM slot_pool FOR UPDATE NOWAIT,实际查询略有差异)。

以下数据库得到完整支持并提供“最佳”体验:

  • PostgreSQL 12+

  • MySQL 8.0+

警告

MariaDB 在 10.6.0 之前未实现 SKIP LOCKEDNOWAIT 子句。缺少这些特性意味着不支持多调度器,并已报告死锁错误。MariaDB 10.6.0 及以上版本理论上可以配合多调度器使用,但尚未进行测试。

注意

Microsoft SQL Server 尚未在 HA 场景下进行测试。

微调调度器性能

影响调度器性能的因素

调度器负责持续为任务安排执行。要对调度器进行微调,需要综合考虑多种因素:

  • 部署方式
    • 可用内存

    • 可用 CPU

    • 网络吞吐量

  • Dag 结构的逻辑与定义
    • DAG 的数量

    • DAG 的复杂度(即任务数量以及依赖关系)

  • 调度器的配置
    • 调度器实例数量

    • 一次循环中调度器处理的任务实例数量

    • 每轮循环应创建/调度的新 Dag Run 数量

    • 调度器执行清理、检查孤立任务并收养它们的频率

要深入了解调度器内部工作原理,可观看 Airflow Summit 2021 的演讲 Deep Dive into the Airflow Scheduler,其中提供了微调的实战思路。

调度器微调的做法

Airflow 为性能微调提供了大量“旋钮”。究竟要调哪些、往哪儿调,取决于你的部署方式、Dag 结构、硬件资源和业务期望。管理部署时,需要先明确优化目标。

Airflow 赋予你自行决定的灵活性,但请先明确对你最重要的性能维度,然后针对该维度选择相应的调参方向。

一般而言,微调的思路与其他性能优化相同(我们不推荐特定工具——请使用平时用于系统观测和监控的工具)。

  • 使用熟悉的监控工具进行系统观察至关重要。本文不列出具体指标和工具,只概述应关注的资源类型,具体实现请遵循你们的监控最佳实践以获取有效数据。

  • 确定对您最重要的性能方面(即您想改进的目标)

  • 观察系统找出瓶颈——CPU、内存、I/O 往往是主要限制因素

  • 基于期望和观察结果,决定下一步改进方向,并回到性能观察、瓶颈定位的循环中。性能提升是一个迭代过程。

可能限制调度器性能的资源

以下资源使用情况值得关注:

  • 数据库连接数及数据库负载。随着并行度提升,Airflow 对数据库连接的需求会急剧增加——这被称为“数据库连接密集”。DAG 越多、并行度越高,打开的连接数就越多。对 MySQL 来说,基于线程的连接模型通常不是问题;但对 PostgreSQL(基于进程的连接模型)则可能出现瓶颈。业界普遍建议在规模达到中等以上的 PostgreSQL 环境中使用 PgBouncer 作为代理。Apache Airflow Helm Chart 已默认支持 PgBouncer。

  • Airflow 调度器可以几乎线性扩展——如果调度器的性能受 CPU 限制,可考虑增加调度器实例数量。

  • 查看内存使用时,请关注 “工作内存”(不同部署可能称呼不同),而非 “总内存占用”。

提升调度器性能的措施

在了解资源使用情况后,可考虑以下改进:

  • 提升资源利用率。当系统存在未被充分利用的空闲容量(CPU、内存、I/O、网络)时,可通过增加调度器数量或缩短调度间隔等方式提升性能,代价是更高的资源占用。

  • 增加硬件容量(例如 CPU 已成为瓶颈时)。调度器性能往往受限于硬件能力,此时唯一办法是升级硬件。比如机器的 CPU 已满负荷,可在新机器上再部署一个调度器——多数情况下,第二、第三个调度器会线性提升调度吞吐(除非共享数据库等成为新瓶颈)。

  • 尝试不同的 “调度器可调参数”。常常需要在性能指标之间进行权衡与取舍,找到最适合业务的平衡点。

调度器配置选项

以下配置项可用于控制调度器行为。除了性能相关的参数外,你还可以在 配置参考[scheduler] 区段中查看其他非性能相关的调度器配置。

  • max_dagruns_to_create_per_loop

    该参数决定每个调度器在创建 DagRun 时会锁定多少个 DAG。若你的 DAG 非常庞大(如每个 DAG 超过 10k 任务)且运行多个调度器,可通过调低此值防止单个调度器独占全部工作。

  • max_dagruns_per_loop_to_schedule

    每次调度循环中,调度器应检查(并锁定)多少个 DagRun 用于任务调度与排队。增大此值有助于提升小型 DAG 的吞吐,但可能导致大型 DAG(例如 >500 任务)吞吐下降。若使用多调度器,设置过高也会导致某一调度器独占所有 DagRun,导致其他调度器无事可做。

  • use_row_level_locking

    是否在相关查询中使用 SELECT ... FOR UPDATE 行级锁。若设为 False,则只能同时运行单个调度器。

  • pool_metrics_interval

    池使用统计信息发送至 StatsD(若开启 statsd_on)的间隔(秒)。该查询相对耗时,应与 StatsD 聚合周期保持一致。

  • ti_metrics_interval

    任务实例(已调度、已排队、运行中、已延期)统计发送至 StatsD 的间隔(秒)。该查询同样相对耗时,建议与 StatsD 聚合周期匹配。

  • orphaned_tasks_check_interval

    调度器检查孤立任务或失效 SchedulerJob 的间隔(秒)。

    该设置决定什么时间会检测到失效的调度器,并让另一调度器接管其“监护”的任务。任务仍在运行,因此短时间未检测到不会产生负面影响。

    当 SchedulerJob 被判定为 “失效”(由 scheduler_health_check_threshold 决定)时,任何由该进程启动的运行中或排队中的任务将被本调度器 “收养” 并继续监控。

  • max_tis_per_query:调度主循环中一次查询的批量大小。该值不应超过 core.parallelism。若设得过高,SQL 查询性能可能因谓词复杂度或锁竞争而受影响。

    此外,可能会受到数据库允许的最大查询长度限制。将其设为 0 则使用 core.parallelism 的值。

  • scheduler_idle_sleep_time 控制调度器在空轮循环时的睡眠时长;若本轮有调度任务,则会立即进入下一轮。此参数名称历史上已存在歧义,后续将在去除旧名后进行更名。

此条目是否有帮助?