动态 DAG 生成¶
本文档描述了如何创建动态生成结构的 DAG,但 DAG 中任务的数量在 DAG 运行之间不会改变。 如果你想实现一个 DAG,其中任务的数量(或 Airflow 2.6 中的任务组)可以根据先前任务的输出/结果而改变,请参阅 动态任务映射。
注意
生成任务和任务组的一致序列
在所有动态生成 DAG 的情况下,您应该确保每次生成 DAG 时,任务和任务组都以一致的顺序生成,否则每次刷新页面时,您可能会发现任务和任务组在网格视图中的顺序发生变化。 例如,可以通过在数据库查询中使用稳定的排序机制或在 Python 中使用 sorted()
函数来实现。
使用环境变量的动态 DAG¶
如果您想使用变量来配置您的代码,您应该始终在顶层代码中使用 环境变量,而不是使用 Airflow 变量。 在顶层代码中使用 Airflow 变量会建立与 Airflow 元数据数据库的连接来获取值,这会减慢解析速度并给数据库带来额外的负载。 请参阅 Airflow 变量的最佳实践,以便在使用 Jinja 模板的 DAG 中充分利用 Airflow 变量。
例如,您可以为您的生产和开发环境设置不同的 DEPLOYMENT
变量。 变量 DEPLOYMENT
在您的生产环境中可以设置为 PROD
,而在您的开发环境中可以设置为 DEV
。 然后,您可以根据环境变量的值,在生产和开发环境中以不同的方式构建您的 DAG。
deployment = os.environ.get("DEPLOYMENT", "PROD")
if deployment == "PROD":
task = Operator(param="prod-param")
elif deployment == "DEV":
task = Operator(param="dev-param")
生成带有嵌入元数据的 Python 代码¶
您可以外部生成包含元数据的 Python 代码作为可导入的常量。 然后,您的 DAG 可以直接导入这些常量,并用于构造对象和构建依赖项。 这样可以很容易地从多个 DAG 导入此类代码,而无需查找、加载和解析存储在常量中的元数据 - 这会在 Python 解释器处理“import”语句时自动完成。 这听起来很奇怪,但生成此类代码并确保它是您可以从 DAG 导入的有效 Python 代码非常容易。
例如,假设您动态生成(在您的 DAG 文件夹中)my_company_utils/common.py
文件
# This file is generated automatically !
ALL_TASKS = ["task1", "task2", "task3"]
然后,您可以像这样在所有 DAG 中导入和使用 ALL_TASKS
常量
from my_company_utils.common import ALL_TASKS
with DAG(
dag_id="my_dag",
schedule=None,
start_date=datetime(2021, 1, 1),
catchup=False,
):
for task in ALL_TASKS:
# create your operators and relations here
...
不要忘记,在这种情况下,您需要在 my_company_utils
文件夹中添加空的 __init__.py
文件,并且您应该将 my_company_utils/.*
行添加到 .airflowignore
文件(如果使用 regexp 忽略语法),以便调度器在查找 DAG 时忽略整个文件夹。
使用来自结构化数据文件的外部配置的动态 DAG¶
如果您需要使用更复杂的元数据来准备您的 DAG 结构,并且您希望将数据保存在结构化的非 Python 格式中,您应该将数据导出到 DAG 文件夹中的文件中,并将其推送到 DAG 文件夹,而不是尝试通过 DAG 的顶层代码来提取数据 - 原因在父级 顶层 Python 代码 中进行了解释。
元数据应与 DAG 一起以方便的文件格式(JSON、YAML 格式是很好的选择)导出并存储在 DAG 文件夹中。 理想情况下,元数据应与您从中加载的 DAG 文件模块在同一包/文件夹中发布,因为这样您可以轻松地在您的 DAG 中找到元数据文件的位置。 可以使用包含 DAG 的模块的 __file__
属性找到要读取的文件的位置
my_dir = os.path.dirname(os.path.abspath(__file__))
configuration_file_path = os.path.join(my_dir, "config.yaml")
with open(configuration_file_path) as yaml_file:
configuration = yaml.safe_load(yaml_file)
# Configuration dict is available here
注册动态 DAG¶
当使用 @dag
装饰器或 with DAG(..)
上下文管理器时,您可以动态生成 DAG,Airflow 会自动注册它们。
from datetime import datetime
from airflow.decorators import dag, task
configs = {
"config1": {"message": "first DAG will receive this message"},
"config2": {"message": "second DAG will receive this message"},
}
for config_name, config in configs.items():
dag_id = f"dynamic_generated_dag_{config_name}"
@dag(dag_id=dag_id, start_date=datetime(2022, 2, 1))
def dynamic_generated_dag():
@task
def print_message(message):
print(message)
print_message(config["message"])
dynamic_generated_dag()
下面的代码将为每个配置生成一个 DAG:dynamic_generated_dag_config1
和 dynamic_generated_dag_config2
。 它们中的每一个都可以使用相关配置单独运行。
如果您不希望自动注册 DAG,您可以通过在您的 DAG 上设置 auto_register=False
来禁用该行为。
在 2.4 版本中更改:从 2.4 版本开始,通过调用 @dag
装饰函数创建的 DAG(或在 with DAG(...)
上下文管理器中使用的 DAG)会自动注册,不再需要存储在全局变量中。
优化执行期间 DAG 解析延迟¶
2.4 版本中的新功能。
这是一个 实验性功能。
有时,当您从单个 DAG 文件生成大量动态 DAG 时,在任务执行期间解析 DAG 文件时可能会导致不必要的延迟。 其影响是任务启动之前的延迟。
为什么会发生这种情况? 您可能没有意识到,但在您的任务执行之前,Airflow 会解析 DAG 所在的 Python 文件。
Airflow 调度器(或者说是 DAG 文件处理器)需要加载完整的 DAG 文件才能处理所有元数据。 但是,任务执行只需要单个 DAG 对象来执行任务。 了解到这一点,我们可以在执行任务时跳过不必要的 DAG 对象的生成,从而缩短解析时间。 当生成的 DAG 数量较多时,此优化效果最为显著。
您可以采取一种实验性方法来优化此行为。 请注意,它并非总是可用(例如,当后续 DAG 的生成依赖于先前的 DAG 时),或者当您的 DAG 的生成存在一些副作用时。 此外,下面的代码片段非常复杂,虽然我们对其进行了测试,并且它在大多数情况下都有效,但在某些情况下,检测当前解析的 DAG 将会失败,并且它将恢复为创建所有 DAG 或失败。 请谨慎使用此解决方案并进行彻底测试。
您可以通过以下示例了解性能改进,该示例在 Airflow 的神奇循环博客文章中进行了展示,其中描述了如何将任务执行期间的解析从 120 秒减少到 200 毫秒。 (该示例是在 Airflow 2.4 之前编写的,因此它使用了 Airflow 的未记录行为。)
在 Airflow 2.4 中,您可以改为使用 get_parsing_context()
方法以记录且可预测的方式检索当前上下文。
在遍历要为其生成 DAG 的事物集合时,您可以使用上下文来确定是否需要生成所有 DAG 对象(在 DAG 文件处理器中解析时),还是仅生成单个 DAG 对象(在执行任务时)。
get_parsing_context()
返回当前解析上下文。 上下文的类型为 AirflowParsingContext
,如果只需要单个 DAG/任务,则它包含设置的 dag_id
和 task_id
字段。 如果需要“完整”解析(例如在 DAG 文件处理器中),则上下文的 dag_id
和 task_id
设置为 None
。
from airflow.models.dag import DAG
from airflow.utils.dag_parsing_context import get_parsing_context
current_dag_id = get_parsing_context().dag_id
for thing in list_of_things:
dag_id = f"generated_dag_{thing}"
if current_dag_id is not None and current_dag_id != dag_id:
continue # skip generation of non-selected DAG
with DAG(dag_id=dag_id, ...):
...