DAG 序列化

为了使 Airflow Webserver 无状态,Airflow >=1.10.7 支持 DAG 序列化和 DB 持久性。从 Airflow 2.0.0 开始,调度程序也使用序列化的 DAG 以保持一致性并做出调度决策。

../_images/dag_serialization.png

如果没有 DAG 序列化和 DB 中的持久性,Webserver 和调度程序都需要访问 DAG 文件。调度程序和 Webserver 都解析 DAG 文件。

通过DAG 序列化,我们的目标是将 Webserver 与 DAG 解析分离,这将使 Webserver 非常轻量级。

如上图所示,使用此功能时,调度程序中的 DagFileProcessorProcess 会解析 DAG 文件,将其序列化为 JSON 格式并将其作为 SerializedDagModel 模型保存在元数据数据库中。

现在,Web 服务器不再需要再次解析 DAG 文件,而是读取 JSON 中序列化的 DAG,对它们进行反序列化,创建 DagBag 并使用它在 UI 中显示。调度程序不需要实际的 DAG 来做出调度决策,我们使用包含从 Airflow 2.0.0 调度 DAG 所需的所有信息的序列化 DAG,而不是使用 DAG 文件(这是 调度程序 HA 的一部分)。

DAG 序列化作为一部分实现的一个关键功能是,我们仅按需从序列化 DAG 表中加载每个 DAG,而不是在 Web 服务器启动时加载整个 DagBag。它有助于减少 Web 服务器的启动时间和内存。当您有大量 DAG 时,这种减少是显着的。

您可以启用将源代码存储在数据库中,以使 Web 服务器完全独立于 DAG 文件。如果您的文件嵌入在 Docker 映像中,或者您可以通过其他方式将它们提供给 Web 服务器,则没有必要这样做。数据存储在 DagCode 模型中。

最后一个元素是呈现模板字段。启用序列化时,不会将模板呈现给请求,而是在任务在工作程序上执行之前保存字段内容的副本。数据存储在 RenderedTaskInstanceFields 模型中。为了限制数据库的过度增长,仅保留最新条目,并清除较旧的条目。

注意

DAG 序列化是严格必需的,并且无法在 Airflow 2.0+ 中关闭。

DAG 序列化设置

airflow.cfg 中添加以下设置

[core]

# You can also update the following default configurations based on your needs
min_serialized_dag_update_interval = 30
min_serialized_dag_fetch_interval = 10
max_num_rendered_ti_fields_per_task = 30
compress_serialized_dags = False
  • min_serialized_dag_update_interval:此标志设置数据库中序列化 DAG 的最小更新间隔(以秒为单位)。这有助于降低数据库写入速率。

  • min_serialized_dag_fetch_interval:此选项控制当序列化 DAG 已加载到 Web 服务器的 DagBag 中时,从数据库重新获取序列化 DAG 的频率。将此值设置得更高将降低数据库的负载,但代价是显示 DAG 的过时缓存版本。

  • max_num_rendered_ti_fields_per_task:此选项控制每个任务在数据库中存储的渲染任务实例字段(模板字段)的最大数量。

  • compress_serialized_dags:此选项控制是否将序列化 DAG 压缩到数据库。当集群中存在非常大的 DAG 时,此选项非常有用。当 True 时,这将禁用 DAG 依赖项视图。

如果您正在从 <1.10.7 更新 Airflow,请不要忘记运行 airflow db migrate

限制

  • 在使用用户定义的过滤器和宏时,Web 服务器中的渲染视图可能会显示尚未执行的 TI 的不正确结果,因为它可能正在使用 Web 服务器无法访问的外部模块。在这种情况下,使用 airflow tasks render CLI 命令来调试或测试模板字段的渲染。一旦任务执行开始,渲染模板字段将存储在数据库中的一个单独表中,之后将在 Web 服务器(渲染视图选项卡)中显示正确的值。

注意

对于完全无状态的 Web 服务器,您需要 Airflow >= 1.10.10。在某些情况下,Airflow 1.10.7 至 1.10.9 需要访问 DAG 文件。更多信息:https://airflow.org.cn/docs/1.10.9/dag-serialization.html#limitations

使用不同的 JSON 库

要使用标准 json 库(如 ujson)以外的其他 JSON 库,需要在本地 Airflow 设置(airflow_local_settings.py)文件中定义一个 json 变量,如下所示

import ujson

json = ujson

有关如何配置本地设置的详细信息,请参阅 配置本地设置

此条目是否有帮助?