Dag 序列化
为了使 Airflow Webserver 无状态,Airflow >=1.10.7 支持 Dag 序列化和数据库持久化。从 Airflow 2.0.0 起,Scheduler 也使用序列化的 Dag 来保持一致性并做调度决策。
如果没有 Dag 序列化和数据库持久化,Webserver 和 Scheduler 都需要访问 Dag 文件。Scheduler 和 Webserver 都会解析 Dag 文件。
通过 **Dag 序列化**,我们旨在使 Webserver 与 Dag 解析解耦,从而让 Webserver 非常轻量化。
如上图所示,使用此功能时,Scheduler 中的 DagFileProcessorProcess 解析 Dag 文件,将其序列化为 JSON 格式,并以 SerializedDagModel 模型保存到元数据数据库中。
现在 Webserver 不再需要再次解析 Dag 文件,而是读取 JSON 格式的序列化 Dag,反序列化后创建 DagBag 并在 UI 中展示。同时 Scheduler 在做调度决策时也不需要实际的 Dag 文件,而是使用包含所有调度所需信息的序列化 Dag(此功能是 Scheduler HA 的一部分,从 Airflow 2.0.0 起实现)。
Dag 序列化实现的关键特性之一是,WebServer 启动时不再一次性加载整个 DagBag,而是按需从 Serialized Dag 表中加载单个 Dag。这有助于降低 Webserver 的启动时间和内存占用,尤其在拥有大量 Dag 时效果显著。
您可以将源代码存储在数据库中,使 Webserver 完全独立于 Dag 文件。如果您的文件已经嵌入到 Docker 镜像中或以其他方式提供给 Webserver,则无需此操作。数据存储在 DagCode 模型中。
最后一个要素是渲染模板字段。启用序列化后,模板不会在请求时渲染,而是在任务在 worker 上执行前保存字段内容的副本。数据存储在 RenderedTaskInstanceFields 模型中。为限制数据库的过度增长,只保留最近的条目,较旧的将被清除。
注意
从 Airflow 2.0+ 起,Dag 序列化是强制要求,无法关闭。
Dag 序列化设置
在 airflow.cfg 中添加以下设置
[core]
# You can also update the following default configurations based on your needs
min_serialized_dag_update_interval = 30
num_dag_runs_to_retain_rendered_fields = 30
compress_serialized_dags = False
min_serialized_dag_update_interval:该标志设置在数据库中更新序列化 Dag 的最小间隔(秒)。这有助于降低数据库写入频率。num_dag_runs_to_retain_rendered_fields:控制保留最近多少次 dag 运行的 Rendered Task Instance Fields。较旧运行的记录将在任务执行期间被删除。compress_serialized_dags:该选项控制是否压缩 Serialized Dag 存入数据库。当集群中有非常大的 Dag 时非常有用。设为True时,将禁用 Dag 依赖视图。
如果您从 <1.10.7 升级 Airflow,请务必运行 airflow db migrate。
限制
当使用用户自定义过滤器和宏时,Webserver 中的 Rendered View 可能会对尚未执行的任务实例(TI)显示不正确的结果,因为它可能使用了 Webserver 无法访问的外部模块。在这种情况下,可使用
airflow tasks renderCLI 命令进行调试或测试模板字段的渲染。一旦任务执行开始,渲染的模板字段将被存入数据库的单独表中,随后 Webserver(Rendered View 选项卡)将显示正确的值。
注意
完全无状态的 Webserver 需要 Airflow >= 1.10.10。Airflow 1.10.7 到 1.10.9 在某些情况下仍需要访问 Dag 文件。更多信息:https://airflow.org.cn/docs/1.10.9/dag-serialization.html#limitations
使用不同的 JSON 库
若想使用除标准 json 库之外的其他 JSON 库(如 ujson),需在本地 Airflow 设置文件(airflow_local_settings.py)中定义一个 json 变量,示例如下
import ujson
json = ujson
有关如何配置本地设置的详细信息,请参阅 配置本地设置。
带默认值的 Dag 序列化(Airflow 3.1+)
从 Airflow 3.1 开始,Dag 序列化在 Task SDK 与 Airflow 服务器组件(Scheduler 与 API-Server)之间建立了带版本的契约。结合 Task Execution API,这实现了客户端与服务器组件的解耦,支持独立部署和升级,同时保持向后兼容并自动解析默认值。
默认值如何工作
当 Airflow 处理 Dag 时,它会按照特定的优先级顺序为服务器应用默认值
Schema 默认:Airflow 内置默认值(最低优先级)
Client 默认:SDK 特定的默认值
Dag default_args:Dag 级别设置(现有行为)
Partial arguments:MappedOperator 共享的值
Task values:显式任务设置(最高优先级)
这意味着您可以在不同层级设置默认值,越具体的设置会覆盖更通用的默认值。
JSON 结构
序列化的 Dag 现在包含一个 client_defaults 区段,里面存放常用的默认值
{
"__version": 3,
"client_defaults": {
"tasks": {
"retry_delay": 300.0,
"owner": "data_team"
}
},
"dag": {
"dag_id": "example_dag",
"default_args": {
"retries": 3
},
"tasks": [{
"task_id": "example_task",
"task_type": "BashOperator",
"_task_module": "airflow.operators.bash",
"bash_command": "echo hello",
"owner": "specific_owner"
}]
}
}
值的应用方式
在上述示例中,任务 example_task 将拥有以下最终值
retry_delay:300.0(来自 client_defaults.tasks)
owner:“data_team”(来自 client_defaults.tasks)
retries:3(来自 dag.default_args,覆盖 client_defaults)
bash_command:“echo hello”(显式任务值)
pool:“default_pool”(来自 schema defaults)
系统会自动沿层级向上查找,填充所有缺失的值。
MappedOperator 默认处理
MappedOperators(动态任务映射)同样参与默认值体系
# Dag Definition
BashOperator.partial(task_id="mapped_task", retries=2, owner="team_lead").expand(
bash_command=["echo 1", "echo 2", "echo 3"]
)
在此示例中,三个生成的任务实例将继承
retries:2(来自 partial arguments)
owner:“team_lead”(来自 partial arguments)
pool:“default_pool”(来自 client_defaults,因为在 partial 中未指定)
bash_command:“echo 1”“echo 2”或“echo 3”(分别来自 expand)
独立部署架构
解耦组件:序列化契约结合 Task Execution API 实现了完全的分离
服务器组件(Scheduler、API-Server):负责编排,不运行用户代码
客户端组件(Task SDK、Dag 处理器):在隔离环境中运行用户代码
关键收益
独立升级:升级服务器组件而不影响用户环境
版本兼容性:单一服务器版本可同时支持多个 SDK 版本
部署灵活性:服务器和客户端组件可分别部署和扩展
安全隔离:用户代码仅在客户端环境运行,永不在服务器组件上执行
多语言 SDK 支持:任何语言都可实现符合规范的 Task SDK
SDK 要求:任意 Task SDK 实现必须
遵循已发布的 schema:- Dag 序列化:生成符合 schema 验证的 JSON。例如:
https://airflow.org.cn/schemas/dag-serialization/v2.json- 任务执行:通过 Execution API schema 支持运行时通信。例如:https://airflow.org.cn/schemas/execution-api/2025-05-20.json包含 client_defaults:可选地在
client_defaults.tasks区段提供 SDK 特定的默认值使用正确的版本控制:包含
__version字段以指示序列化格式
服务器保障:只要 SDK 符合两套 schema 合同,Airflow 服务器组件就会
正确地从任何符合规范的 SDK 反序列化 Dag
在运行时支持任务执行通信
根据层级应用适当的默认值
保持跨 SDK 版本和语言的兼容性
实现状态
当前状态(Airflow 3.1): 序列化契约为客户端/服务器解耦奠定了基础。尽管某些服务器组件仍包含 Task SDK 代码(反之亦然),但该契约确保
Schema 合规:在组件分离时实现独立部署
版本兼容性:无论代码是否耦合均能工作
部署分离:在架构上已得到支持,尽管尚未完全实现
未来演进:计划在未来版本实现服务器和客户端组件之间的完全代码解耦。schema 合约提供了稳固的接口,随着演进将保持一致。