Dag 文件处理
Dag 文件处理指的是读取定义 Dag 的 Python 文件并将其存储,以便调度器能够调度这些 Dag。
Dag 文件处理涉及两个主要组件。DagFileProcessorManager 是一个无限循环的进程,用于判断哪些文件需要被处理;DagFileProcessorProcess 是一个独立进程,用于将单个文件转换为一个或多个 Dag 对象。
DagFileProcessorManager 会运行用户代码。因此,它以独立进程的方式运行,调用 airflow dag-processor CLI 命令。
DagFileProcessorManager 包含以下步骤
检查新文件:如果 Dag 上次刷新以来的时间已超过 refresh_interval,则更新文件路径列表
排除最近处理过的文件:排除在 min_file_process_interval 之内且未被修改的文件
将文件路径加入队列:把发现的文件加入文件路径队列
处理文件:为每个文件启动一个新的
DagFileProcessorProcess,最多不超过 parsing_processes收集结果:收集所有已结束的 Dag 处理器的结果
记录统计信息:打印统计信息并发送
dag_processing.total_parse_time
DagFileProcessorProcess 包含以下步骤
处理文件:整个过程必须在 dag_file_processor_timeout 之内完成
将 Dag 文件加载为 Python 模块:必须在 dagbag_import_timeout 之内完成
处理模块:在 Python 模块中查找 Dag 对象
返回 DagBag:向
DagFileProcessorManager提供已发现的 Dag 对象列表
微调你的 Dag 处理器性能
哪些因素会影响 Dag 处理器的性能
Dag 处理器负责持续解析 Dag 文件并与数据库中的 Dag 同步。要微调你的 Dag 处理器,需要考虑多个因素。
- 你的部署类型
用于共享 Dag 的文件系统类型(会影响持续读取 Dag 的性能)
文件系统的速度(在许多分布式云文件系统场景下,你可以付费获取更高的吞吐量/更快的文件系统)
用于处理的内存容量
可用的 CPU 核数
可用的网络带宽
- Dag 结构的逻辑与定义方式
Dag 文件的数量
每个文件中包含的 Dag 数量
Dag 文件的大小(记住 Dag 解析器会每隔若干秒读取并解析一次文件)
Dag 的复杂程度(即解析速度、任务和依赖的数量)
解析 Dag 文件时是否在顶层导入了大量库或执行了重量级处理(提示!不应该这样。参见 顶层 Python 代码)
- Dag 处理器的配置
Dag 处理器的实例数量
Dag 处理器中解析进程的数量
同一 Dag 重新解析之间的等待时间(该过程是持续进行的)
每次 Dag 处理器循环中运行的回调数量
如何对 Dag 处理器进行微调
Airflow 为你提供了大量“旋钮”用于性能微调,但究竟该调哪些旋钮取决于你的部署方式、Dag 结构、硬件资源以及业务期望。管理部署时,需要先确定要优化的目标:有的用户可以接受新 Dag 解析延迟 30 秒,以换取更低的 CPU 使用率;而有的用户则要求 Dag 在出现于目录时几乎立即解析,即使会导致更高的 CPU 消耗。
Airflow 让你自行决定调优方向,但你必须先明确对你而言最重要的性能维度,然后选择相应的旋钮并决定调高或调低。
一般而言,微调的思路与任何性能改进和优化相同(我们不推荐特定工具——只需使用你平时用来观察和监控系统的工具即可)。
使用合适的监控工具监测系统至关重要。本文不涉及具体的指标和工具,只说明应关注哪些资源类别,具体监控手段请遵循你的最佳实践。
明确对你最关键的性能维度(想要改进的地方)
观察系统瓶颈所在:CPU、内存、I/O 通常是主要限制因素
基于预期和观察结果决定下一步的改进方向,随后再次观察性能瓶颈。性能提升是一个迭代过程。
哪些资源可能限制 Dag 处理器的性能
以下几个资源使用领域需要特别关注
文件系统性能。Airflow Dag 处理器大量依赖对 Python 文件的解析(往往次数很多),这些文件通常位于共享文件系统上。Dag 处理器会不断读取并重新解析这些文件。相同的文件还需供 Worker 使用,因此常常存放于分布式文件系统(如 NFS、CIFS、EFS、GCS fuse、Azure Files 等)。这些文件系统有多种可调参数,超出本文范围。请监控文件系统的统计信息和使用情况,以判断性能瓶颈是否来源于文件系统。例如,有用户通过提升 EFS 的 IOPS(并支付相应费用)显著提升了 Dag 解析的稳定性和速度。
如果文件系统成为瓶颈,可考虑使用其他分发方式。将 Dag 打包进镜像或通过 GitSync 分发,都能让 Dag 处理器本地读取文件,避免使用分布式文件系统,从而获得最快的本地读取速度(尤其在机器使用高速 SSD 本地存储时)。这些方式各有特点,可能不适合所有场景,但当性能瓶颈来源于分布式文件系统时,它们通常是最佳方案。
数据库连接与数据库使用量在提升并行处理能力时也可能成为问题。Airflow 被称为“数据库连接需求大”——Dag 越多、并行处理越多,就需要打开越多数据库连接。对 MySQL 来说,这通常不是问题(其连接模型基于线程),但对 Postgres 来说,连接是进程级别的,可能会受限。因此,业界普遍共识是:在中等规模的基于 Postgres 的 Airflow 环境中,最佳做法是使用 PGBouncer 作为数据库代理。Apache Airflow Helm Chart 已内置对 PGBouncer 的支持。
CPU 使用率是文件处理器(即解析并执行 Python Dag 文件的进程)最关键的指标。因为 Dag 处理器会持续触发解析,当 Dag 数量很多时,CPU 消耗会显著提升。可以通过增大 min_file_process_interval 来降低 CPU 占用,但这会导致文件变更被检测的延迟变长,UI 中看到的更新以及调度器的执行都会变慢。优化 Dag 编写方式、避免在顶层导入外部数据源是降低 CPU 使用的根本办法。如果有更多 CPU 可用,可提升 parsing_processes 的数量。
Airflow 在追求更高性能时往往会占用大量内存。提升性能的常见手段是增加处理进程数量,而每个进程都会加载完整的 Python 解释器、导入大量类并使用临时内存。Airflow 通过 fork + copy‑on‑write 机制在一定程度上共享内存,但如果在 fork 后仍然动态导入新类,会产生额外的内存压力。请监控系统是否出现内存超额使用并触发 swap,swap 会导致性能骤降。观察内存使用时,重点关注
working memory(具体名称视部署而异),而不是总内存使用量。
你可以采取哪些措施来提升 Dag 处理器的性能
在了解资源使用情况后,可以考虑以下改进措施:
优化 Dag 顶层 Python 代码的逻辑与效率,降低解析复杂度。由于 Dag 会被持续解析,优化这段代码往往能带来巨大的提升,尤其是避免在解析时访问外部数据库等操作(应绝对避免)。顶层 Python 代码 章节阐述了最佳实践;降低 Dag 复杂度 文档提供了可供参考的降复杂度方向。
提升资源利用率。当系统中存在未被充分利用的容量(例如 CPU、内存、I/O、网络),可通过增加解析进程数量等方式提升性能,代价是相应资源使用率升高。
扩充硬件资源(例如 CPU 瓶颈或文件系统 I/O 已达极限时)。在很多情况下,Dag 处理器性能受限仅仅是因为硬件不足,除非共享数据库或文件系统本身成为瓶颈,否则增加硬件是唯一可行的方案。
尝试不同的 “Dag 处理器可调参数”。通常可以通过牺牲某一性能指标来提升另一指标,例如降低 CPU 使用可以通过延长文件处理间隔实现(但会导致新 Dag 出现延迟)。微调的本质是平衡多方面的需求。
有时可以略微更改 Dag 处理器的行为(例如改变解析排序顺序),以获得更适合特定部署的微调效果。
Dag 处理器配置选项
以下配置项可用于控制 Dag 处理器的各个方面。除此之外,你还可以在 配置参考 中的 [dag_processor] 部分找到其他非性能相关的参数。
file_parsing_sort_mode Dag 处理器会列出并排序 Dag 文件,以确定解析顺序。
min_file_process_interval 重新解析同一 Dag 文件的间隔秒数。Dag 文件会每隔
min_file_process_interval秒被解析一次。该间隔越短,CPU 使用率越高。parsing_processes Dag 处理器可以并行启动多个进程来解析 Dag 文件,此配置决定启动的进程数。