DAG 文件处理

DAG 文件处理指的是将 DAG 文件夹中包含的 Python 文件转换为包含要调度的任务的 DAG 对象的过程。

DAG 文件处理涉及两个主要组件。DagFileProcessorManager 是一个执行无限循环的进程,它确定哪些文件需要处理,而 DagFileProcessorProcess 是一个单独的进程,它被启动以将单个文件转换为一个或多个 DAG 对象。

DagFileProcessorManager 运行用户代码。因此,您可以决定在与调度器进程不同的主机中将其作为独立进程运行。如果您决定将其作为独立进程运行,则需要设置此配置:AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True 并运行 airflow dag-processor CLI 命令,否则,启动调度器进程 (airflow scheduler) 也会启动 DagFileProcessorManager

../_images/dag_file_processing_diagram.png

DagFileProcessorManager 具有以下步骤

  1. 检查新文件:如果自上次刷新 DAG 以来经过的时间 > dag_dir_list_interval,则更新文件路径列表

  2. 排除最近处理过的文件:排除处理时间比 min_file_process_interval 更短且未修改的文件

  3. 队列文件路径:将发现的文件添加到文件路径队列

  4. 处理文件:为每个文件启动一个新的 DagFileProcessorProcess,最多为 parsing_processes

  5. 收集结果:从任何已完成的 DAG 处理器收集结果

  6. 记录统计信息:打印统计信息并发出 dag_processing.total_parse_time

DagFileProcessorProcess 具有以下步骤

  1. 处理文件:整个过程必须在 dag_file_processor_timeout 内完成

  2. DAG 文件作为 Python 模块加载:必须在 dagbag_import_timeout 内完成

  3. 处理模块:在 Python 模块中查找 DAG 对象

  4. 返回 DagBag:向 DagFileProcessorManager 提供已发现的 DAG 对象列表

此条目是否对您有帮助?