Airflow Summit 2025 将于 10 月 07-09 日举行。立即注册享早鸟票优惠!

DAG 文件处理

DAG 文件处理是指读取定义 DAG 的 Python 文件并存储它们的过程,以便调度器可以调度它们。

DAG 文件处理主要涉及两个组件。其中,DagFileProcessorManager 是一个执行无限循环的进程,它确定哪些文件需要被处理;而 DagFileProcessorProcess 是一个独立的进程,它被启动用于将单个文件转换为一个或多个 DAG 对象。

DagFileProcessorManager 运行用户代码。因此,它通过运行 airflow dag-processor CLI 命令作为独立进程运行。

../_images/dag_file_processing_diagram.png

DagFileProcessorManager 的步骤如下:

  1. 检查新文件:如果自上次刷新 DAG 以来经过的时间大于 dag_dir_list_interval (已弃用),则更新文件路径列表。

  2. 排除近期处理过的文件:排除最近处理时间早于 min_file_process_interval 且未被修改的文件。

  3. 文件路径入队:将发现的文件添加到文件路径队列。

  4. 处理文件:为每个文件启动一个新的 DagFileProcessorProcess,最多可达 parsing_processes 个。

  5. 收集结果:收集任何已完成的 DAG 处理器返回的结果。

  6. 记录统计信息:打印统计信息并发送 dag_processing.total_parse_time

DagFileProcessorProcess 的步骤如下:

  1. 处理文件:整个处理过程必须在 dag_file_processor_timeout 之内完成。

  2. 将 DAG 文件加载为 Python 模块:必须在 dagbag_import_timeout 之内完成。

  3. 处理模块:在 Python 模块中查找 DAG 对象。

  4. 返回 DagBag:向 DagFileProcessorManager 提供发现的 DAG 对象列表。

优化 DAG 处理器性能

影响 DAG 处理器性能的因素

DAG 处理器负责持续解析 DAG 文件并与数据库中的 DAG 同步。为了优化 DAG 处理器,需要考虑以下多种因素。

  • 您的部署类型
    • 用于共享 DAG 的文件系统类型(影响持续读取 DAG 的性能)

    • 文件系统的速度如何(在许多分布式云文件系统的情况下,可以通过额外付费获得更高的吞吐量/更快速的文件系统)

    • 您有多少可用于处理的内存

    • 您有多少可用的 CPU

    • 您有多少可用的网络吞吐量

  • DAG 结构的逻辑和定义
    • 您有多少个 DAG 文件

    • 您的文件中有多少个 DAG

    • DAG 文件有多大(记住 DAG 解析器需要每 n 秒读取和解析文件)

    • 它们有多复杂(即解析速度有多快,有多少任务和依赖关系)

    • 解析 DAG 文件是否涉及导入大量库或在顶层进行大量处理(提示!不应该这样做。请参阅 顶层 Python 代码

  • DAG 处理器配置
    • 您有多少个 DAG 处理器

    • 您的 DAG 处理器中有多少个解析进程

    • DAG 处理器在重新解析同一个 DAG 之间等待多长时间(这是持续发生的)

    • 每个 DAG 处理器循环中运行多少个回调

如何着手优化 DAG 处理器

Airflow 提供了许多“旋钮”供您调整以优化性能,但这取决于您的特定部署、DAG 结构、硬件可用性和期望,您需要单独决定调整哪些旋钮才能获得最佳效果。管理部署的一部分工作是决定您要优化哪些方面。例如,一些用户可以接受新 DAG 解析延迟 30 秒,以换取较低的 CPU 使用率;而另一些用户则希望 DAG 出现在 DAG 文件夹中时几乎立即被解析,但这会以较高的 CPU 使用率为代价。

Airflow 提供了灵活的决策空间,但是您应该弄清楚哪个性能方面对您来说最重要,并决定要朝哪个方向调整哪些旋钮。

通常,对于性能优化,您的方法应该与进行任何性能改进和优化时相同(我们不会推荐任何特定工具 - 只需使用您通常用来观察和监控系统的工具)。

  • 使用您通常用来监控系统的正确工具集来监控您的系统至关重要。本文档不详细介绍您可以使用的特定指标和工具,它只描述您应该监控哪种资源,但您应遵循自己的监控最佳实践来获取正确的数据。

  • 决定哪个性能方面对您来说最重要(您想要改进什么)

  • 观察您的系统以查看瓶颈所在:CPU、内存、I/O 通常是限制因素。

  • 根据您的期望和观察结果 - 决定您的下一步改进是什么,并回到对性能和瓶颈的观察。性能改进是一个迭代过程。

可能限制 DAG 处理器性能的资源

有几个资源使用方面需要您注意:

  • 文件系统性能。Airflow DAG 处理器严重依赖解析 Python 文件(有时是大量文件),这些文件通常位于共享文件系统上。DAG 处理器持续读取和重新解析这些文件。同样的文件必须对 Worker 可用,因此它们通常存储在分布式文件系统中。您可以使用各种文件系统来实现此目的(NFS、CIFS、EFS、GCS fuse、Azure File System 都是很好的例子)。您可以控制这些文件系统的各种参数并优化其性能,但这超出了本文档的范围。您应该观察文件系统的统计信息和使用情况,以确定问题是否来自文件系统性能。例如,有传闻证据表明,在使用 EFS 时,提高 EFS 性能的 IOPS(并为此支付更多费用)可以显着提高解析 Airflow DAG 的稳定性和速度。

  • 如果文件系统性能成为您的瓶颈,另一种解决方案是转向分发 DAG 的替代机制。将 DAG 嵌入到您的镜像中以及 GitSync 分发都具有文件对 DAG 处理器本地可用的特性,并且它不必使用分布式文件系统来读取文件,文件对 DAG 处理器本地可用,并且通常尽可能快,特别是如果您的机器使用快速 SSD 磁盘进行本地存储。这些分发机制具有其他特性,可能使其对您来说不是最佳选择,但如果您的性能问题来自分布式文件系统性能,它们可能是最佳方法。

  • 随着您想要提高性能并并行处理更多事物,数据库连接和数据库使用可能会成为问题。Airflow 因“数据库连接饥渴”而闻名 - 您拥有的 DAG 越多,想要并行处理的越多,就会打开更多的数据库连接。对于 MySQL 来说,这通常不是问题,因为其连接处理模型是基于线程的;但这对于 Postgres 来说可能是问题,因为其连接处理是基于进程的。普遍的共识是,即使是中等规模的基于 Postgres 的 Airflow 安装,最佳解决方案也是使用 PGBouncer 作为数据库代理。Apache Airflow 的 Helm Chart 开箱即用地支持 PGBouncer。

  • CPU 使用率对 FileProcessors 最重要 - 这些是解析和执行 Python DAG 文件的进程。由于 DAG 处理器通常持续触发此类解析,当您有大量 DAG 时,处理可能会占用大量 CPU。您可以通过增加 min_file_process_interval 来缓解,但这是一种权衡,结果是对此类文件的更改会被更慢地拾取,您将看到提交文件与它们在 Airflow UI 中可用并由调度器执行之间的延迟。优化 DAG 的构建方式,避免外部数据源是改善 CPU 使用率的最佳方法。如果您有更多可用的 CPU,您可以增加处理线程数 parsing_processes

  • 当您尝试从 Airflow 中榨取更多性能时,它可能会使用相当大的内存。在 Airflow 中,通常通过增加处理负载的进程数来获得更高的性能,每个进程都需要加载整个 Python 解释器、导入大量类和临时内存存储。Airflow 通过使用 forking 和写时复制 (copy-on-write) 内存来优化其中很多部分,但在 forking 后导入新类的情况下,这可能会导致额外的内存压力。您需要观察您的系统是否使用了超过其拥有的内存 - 这会导致使用交换空间,从而显着降低性能。查看内存使用情况时,请务必注意您正在观察的是哪种类型的内存。通常,您应该查看 工作内存 (working memory)(名称可能因您的部署而异),而不是 已使用的总内存 (total memory used)

如何改进 DAG 处理器性能

当您了解自己的资源使用情况后,可以考虑的改进措施可能包括:

  • 改进顶层 DAG Python 代码的逻辑、解析效率并降低其复杂性。它会被持续解析,因此优化该代码可能会带来巨大的改进,特别是如果您在解析 DAG 时尝试访问某些外部数据库等(应不惜一切代价避免这样做)。顶层 Python 代码 解释了编写顶层 Python 代码的最佳实践。降低 DAG 复杂性 文档提供了一些您在想要降低代码复杂性时可以关注的领域。

  • 提高资源利用率。当您的系统中存在未充分利用的空闲容量时(CPU、内存、I/O、网络再次是主要候选者),您可以采取增加解析进程数等措施,这可能会以提高这些资源的利用率为代价来改善性能。

  • 增加硬件容量(例如,如果您发现 CPU 是瓶颈或者用于 DAG 文件系统的 I/O 已达到极限)。通常,DAG 处理器性能问题仅仅是因为您的系统“能力”不足,这可能是唯一的解决方案,除非共享数据库或文件系统是瓶颈。

  • 试验“DAG 处理器可调参数”的不同值。通常,您可以通过简单地用一个性能方面交换另一个方面来获得更好的效果。例如,如果您想降低 CPU 使用率,可以增加文件处理间隔(但这会导致新 DAG 出现更大的延迟)。通常,性能调优是平衡不同方面的艺术。

  • 有时您可以稍微更改 DAG 处理器的行为(例如更改解析排序顺序),以便为您的特定部署获得更好的调优结果。

DAG 处理器配置选项

以下配置设置可用于控制调度器的各个方面。然而,您还可以查看其他与性能无关的调度器配置参数,这些参数可在 配置参考[scheduler] 部分找到。

  • file_parsing_sort_mode 调度器将列出并排序 DAG 文件以决定解析顺序。

  • min_file_process_interval DAG 文件重新解析的秒数间隔。DAG 文件每隔 min_file_process_interval 秒被解析一次。对 DAG 的更新在此间隔后反映。保持此值较低会增加 CPU 使用率。

  • parsing_processes 调度器可以并行运行多个进程来解析 DAG 文件。这定义了将运行多少个进程。

本条目有帮助吗?